1use std::collections::HashMap;
16use std::sync::Arc;
17use std::time::{Duration, Instant, UNIX_EPOCH};
18
19use api::v1::value::ValueData;
20use api::v1::{
21 ColumnDataType, ColumnDef, ColumnSchema, CreateTableExpr, Row, RowInsertRequest,
22 RowInsertRequests, Rows, SemanticType,
23};
24use catalog::CatalogManagerRef;
25use common_catalog::consts::{default_engine, DEFAULT_PRIVATE_SCHEMA_NAME};
26use common_telemetry::logging::{SlowQueriesRecordType, SlowQueryOptions};
27use common_telemetry::{debug, error, info, slow};
28use common_time::timestamp::{TimeUnit, Timestamp};
29use operator::insert::InserterRef;
30use operator::statement::StatementExecutorRef;
31use query::parser::QueryStatement;
32use rand::random;
33use session::context::{QueryContextBuilder, QueryContextRef};
34use snafu::ResultExt;
35use store_api::mito_engine_options::{APPEND_MODE_KEY, TTL_KEY};
36use table::TableRef;
37use tokio::sync::mpsc::{channel, Receiver, Sender};
38use tokio::task::JoinHandle;
39
40use crate::error::{CatalogSnafu, Result, TableOperationSnafu};
41
42const SLOW_QUERY_TABLE_NAME: &str = "slow_queries";
43const SLOW_QUERY_TABLE_COST_COLUMN_NAME: &str = "cost";
44const SLOW_QUERY_TABLE_THRESHOLD_COLUMN_NAME: &str = "threshold";
45const SLOW_QUERY_TABLE_QUERY_COLUMN_NAME: &str = "query";
46const SLOW_QUERY_TABLE_TIMESTAMP_COLUMN_NAME: &str = "timestamp";
47const SLOW_QUERY_TABLE_IS_PROMQL_COLUMN_NAME: &str = "is_promql";
48const SLOW_QUERY_TABLE_PROMQL_START_COLUMN_NAME: &str = "promql_start";
49const SLOW_QUERY_TABLE_PROMQL_END_COLUMN_NAME: &str = "promql_end";
50const SLOW_QUERY_TABLE_PROMQL_RANGE_COLUMN_NAME: &str = "promql_range";
51const SLOW_QUERY_TABLE_PROMQL_STEP_COLUMN_NAME: &str = "promql_step";
52
53const DEFAULT_SLOW_QUERY_TABLE_TTL: &str = "30d";
54const DEFAULT_SLOW_QUERY_EVENTS_CHANNEL_SIZE: usize = 1024;
55
56#[derive(Clone)]
58pub struct SlowQueryRecorder {
59 tx: Sender<SlowQueryEvent>,
60 slow_query_opts: SlowQueryOptions,
61 _handle: Arc<JoinHandle<()>>,
62}
63
64#[derive(Debug)]
65struct SlowQueryEvent {
66 cost: u64,
67 threshold: u64,
68 query: String,
69 is_promql: bool,
70 query_ctx: QueryContextRef,
71 promql_range: Option<u64>,
72 promql_step: Option<u64>,
73 promql_start: Option<i64>,
74 promql_end: Option<i64>,
75}
76
77impl SlowQueryRecorder {
78 pub fn new(
80 slow_query_opts: SlowQueryOptions,
81 inserter: InserterRef,
82 statement_executor: StatementExecutorRef,
83 catalog_manager: CatalogManagerRef,
84 ) -> Self {
85 let (tx, rx) = channel(DEFAULT_SLOW_QUERY_EVENTS_CHANNEL_SIZE);
86
87 let ttl = slow_query_opts
88 .ttl
89 .clone()
90 .unwrap_or(DEFAULT_SLOW_QUERY_TABLE_TTL.to_string());
91
92 let event_handler = SlowQueryEventHandler {
94 inserter,
95 statement_executor,
96 catalog_manager,
97 rx,
98 record_type: slow_query_opts.record_type,
99 ttl,
100 };
101
102 let handle = tokio::spawn(async move {
104 event_handler.process_slow_query().await;
105 });
106
107 Self {
108 tx,
109 slow_query_opts,
110 _handle: Arc::new(handle),
111 }
112 }
113
114 pub fn start(
117 &self,
118 stmt: QueryStatement,
119 query_ctx: QueryContextRef,
120 ) -> Option<SlowQueryTimer> {
121 if self.slow_query_opts.enable {
122 Some(SlowQueryTimer {
123 stmt,
124 query_ctx,
125 start: Instant::now(), threshold: self.slow_query_opts.threshold,
127 sample_ratio: self.slow_query_opts.sample_ratio,
128 tx: self.tx.clone(),
129 })
130 } else {
131 None
132 }
133 }
134}
135
136struct SlowQueryEventHandler {
137 inserter: InserterRef,
138 statement_executor: StatementExecutorRef,
139 catalog_manager: CatalogManagerRef,
140 rx: Receiver<SlowQueryEvent>,
141 record_type: SlowQueriesRecordType,
142 ttl: String,
143}
144
145impl SlowQueryEventHandler {
146 async fn process_slow_query(mut self) {
147 info!(
148 "Start the background handler to process slow query events and record them in {:?}.",
149 self.record_type
150 );
151 while let Some(event) = self.rx.recv().await {
152 self.record_slow_query(event).await;
153 }
154 }
155
156 async fn record_slow_query(&self, event: SlowQueryEvent) {
157 match self.record_type {
158 SlowQueriesRecordType::Log => {
159 slow!(
161 cost = event.cost,
162 threshold = event.threshold,
163 query = event.query,
164 is_promql = event.is_promql,
165 promql_range = event.promql_range,
166 promql_step = event.promql_step,
167 promql_start = event.promql_start,
168 promql_end = event.promql_end,
169 );
170 }
171 SlowQueriesRecordType::SystemTable => {
172 if let Err(e) = self.insert_slow_query(&event).await {
174 error!(e; "Failed to insert slow query, query: {:?}", event);
175 }
176 }
177 }
178 }
179
180 async fn insert_slow_query(&self, event: &SlowQueryEvent) -> Result<()> {
181 debug!("Handle the slow query event: {:?}", event);
182
183 let table = if let Some(table) = self
184 .catalog_manager
185 .table(
186 event.query_ctx.current_catalog(),
187 DEFAULT_PRIVATE_SCHEMA_NAME,
188 SLOW_QUERY_TABLE_NAME,
189 Some(&event.query_ctx),
190 )
191 .await
192 .context(CatalogSnafu)?
193 {
194 table
195 } else {
196 self.create_system_table(event.query_ctx.clone()).await?
198 };
199
200 let insert = RowInsertRequest {
201 table_name: SLOW_QUERY_TABLE_NAME.to_string(),
202 rows: Some(Rows {
203 schema: self.build_insert_column_schema(),
204 rows: vec![Row {
205 values: vec![
206 ValueData::U64Value(event.cost).into(),
207 ValueData::U64Value(event.threshold).into(),
208 ValueData::StringValue(event.query.to_string()).into(),
209 ValueData::BoolValue(event.is_promql).into(),
210 ValueData::TimestampNanosecondValue(
211 Timestamp::current_time(TimeUnit::Nanosecond).value(),
212 )
213 .into(),
214 ValueData::U64Value(event.promql_range.unwrap_or(0)).into(),
215 ValueData::U64Value(event.promql_step.unwrap_or(0)).into(),
216 ValueData::TimestampMillisecondValue(event.promql_start.unwrap_or(0))
217 .into(),
218 ValueData::TimestampMillisecondValue(event.promql_end.unwrap_or(0)).into(),
219 ],
220 }],
221 }),
222 };
223
224 let requests = RowInsertRequests {
225 inserts: vec![insert],
226 };
227
228 let table_info = table.table_info();
229 let query_ctx = QueryContextBuilder::default()
230 .current_catalog(table_info.catalog_name.to_string())
231 .current_schema(table_info.schema_name.to_string())
232 .build()
233 .into();
234
235 self.inserter
236 .handle_row_inserts(requests, query_ctx, &self.statement_executor, false)
237 .await
238 .context(TableOperationSnafu)?;
239
240 Ok(())
241 }
242
243 async fn create_system_table(&self, query_ctx: QueryContextRef) -> Result<TableRef> {
244 let mut create_table_expr = self.build_create_table_expr(query_ctx.current_catalog());
245 if let Some(table) = self
246 .catalog_manager
247 .table(
248 &create_table_expr.catalog_name,
249 &create_table_expr.schema_name,
250 &create_table_expr.table_name,
251 Some(&query_ctx),
252 )
253 .await
254 .context(CatalogSnafu)?
255 {
256 return Ok(table);
258 }
259
260 let table = self
262 .statement_executor
263 .create_table_inner(&mut create_table_expr, None, query_ctx.clone())
264 .await
265 .context(TableOperationSnafu)?;
266
267 info!(
268 "Create the {} system table in {:?} successfully.",
269 SLOW_QUERY_TABLE_NAME, DEFAULT_PRIVATE_SCHEMA_NAME
270 );
271
272 Ok(table)
273 }
274
275 fn build_create_table_expr(&self, catalog: &str) -> CreateTableExpr {
276 let column_defs = vec![
277 ColumnDef {
278 name: SLOW_QUERY_TABLE_COST_COLUMN_NAME.to_string(),
279 data_type: ColumnDataType::Uint64 as i32,
280 is_nullable: false,
281 default_constraint: vec![],
282 semantic_type: SemanticType::Field as i32,
283 comment: "The cost of the slow query in milliseconds".to_string(),
284 datatype_extension: None,
285 options: None,
286 },
287 ColumnDef {
288 name: SLOW_QUERY_TABLE_THRESHOLD_COLUMN_NAME.to_string(),
289 data_type: ColumnDataType::Uint64 as i32,
290 is_nullable: false,
291 default_constraint: vec![],
292 semantic_type: SemanticType::Field as i32,
293 comment:
294 "When the query cost exceeds this value, it will be recorded as a slow query"
295 .to_string(),
296 datatype_extension: None,
297 options: None,
298 },
299 ColumnDef {
300 name: SLOW_QUERY_TABLE_QUERY_COLUMN_NAME.to_string(),
301 data_type: ColumnDataType::String as i32,
302 is_nullable: false,
303 default_constraint: vec![],
304 semantic_type: SemanticType::Field as i32,
305 comment: "The original query statement".to_string(),
306 datatype_extension: None,
307 options: None,
308 },
309 ColumnDef {
310 name: SLOW_QUERY_TABLE_IS_PROMQL_COLUMN_NAME.to_string(),
311 data_type: ColumnDataType::Boolean as i32,
312 is_nullable: false,
313 default_constraint: vec![],
314 semantic_type: SemanticType::Field as i32,
315 comment: "Whether the query is a PromQL query".to_string(),
316 datatype_extension: None,
317 options: None,
318 },
319 ColumnDef {
320 name: SLOW_QUERY_TABLE_TIMESTAMP_COLUMN_NAME.to_string(),
321 data_type: ColumnDataType::TimestampNanosecond as i32,
322 is_nullable: false,
323 default_constraint: vec![],
324 semantic_type: SemanticType::Timestamp as i32,
325 comment: "The timestamp of the slow query".to_string(),
326 datatype_extension: None,
327 options: None,
328 },
329 ColumnDef {
330 name: SLOW_QUERY_TABLE_PROMQL_RANGE_COLUMN_NAME.to_string(),
331 data_type: ColumnDataType::Uint64 as i32,
332 is_nullable: false,
333 default_constraint: vec![],
334 semantic_type: SemanticType::Field as i32,
335 comment: "The time range of the PromQL query in milliseconds".to_string(),
336 datatype_extension: None,
337 options: None,
338 },
339 ColumnDef {
340 name: SLOW_QUERY_TABLE_PROMQL_STEP_COLUMN_NAME.to_string(),
341 data_type: ColumnDataType::Uint64 as i32,
342 is_nullable: false,
343 default_constraint: vec![],
344 semantic_type: SemanticType::Field as i32,
345 comment: "The step of the PromQL query in milliseconds".to_string(),
346 datatype_extension: None,
347 options: None,
348 },
349 ColumnDef {
350 name: SLOW_QUERY_TABLE_PROMQL_START_COLUMN_NAME.to_string(),
351 data_type: ColumnDataType::TimestampMillisecond as i32,
352 is_nullable: false,
353 default_constraint: vec![],
354 semantic_type: SemanticType::Field as i32,
355 comment: "The start timestamp of the PromQL query in milliseconds".to_string(),
356 datatype_extension: None,
357 options: None,
358 },
359 ColumnDef {
360 name: SLOW_QUERY_TABLE_PROMQL_END_COLUMN_NAME.to_string(),
361 data_type: ColumnDataType::TimestampMillisecond as i32,
362 is_nullable: false,
363 default_constraint: vec![],
364 semantic_type: SemanticType::Field as i32,
365 comment: "The end timestamp of the PromQL query in milliseconds".to_string(),
366 datatype_extension: None,
367 options: None,
368 },
369 ];
370
371 let table_options = HashMap::from([
372 (APPEND_MODE_KEY.to_string(), "true".to_string()),
373 (TTL_KEY.to_string(), self.ttl.to_string()),
374 ]);
375
376 CreateTableExpr {
377 catalog_name: catalog.to_string(),
378 schema_name: DEFAULT_PRIVATE_SCHEMA_NAME.to_string(), table_name: SLOW_QUERY_TABLE_NAME.to_string(),
380 desc: "GreptimeDB system table for storing slow queries".to_string(),
381 column_defs,
382 time_index: SLOW_QUERY_TABLE_TIMESTAMP_COLUMN_NAME.to_string(),
383 primary_keys: vec![],
384 create_if_not_exists: true,
385 table_options,
386 table_id: None,
387 engine: default_engine().to_string(),
388 }
389 }
390
391 fn build_insert_column_schema(&self) -> Vec<ColumnSchema> {
392 vec![
393 ColumnSchema {
394 column_name: SLOW_QUERY_TABLE_COST_COLUMN_NAME.to_string(),
395 datatype: ColumnDataType::Uint64.into(),
396 semantic_type: SemanticType::Field.into(),
397 ..Default::default()
398 },
399 ColumnSchema {
400 column_name: SLOW_QUERY_TABLE_THRESHOLD_COLUMN_NAME.to_string(),
401 datatype: ColumnDataType::Uint64.into(),
402 semantic_type: SemanticType::Field.into(),
403 ..Default::default()
404 },
405 ColumnSchema {
406 column_name: SLOW_QUERY_TABLE_QUERY_COLUMN_NAME.to_string(),
407 datatype: ColumnDataType::String.into(),
408 semantic_type: SemanticType::Field.into(),
409 ..Default::default()
410 },
411 ColumnSchema {
412 column_name: SLOW_QUERY_TABLE_IS_PROMQL_COLUMN_NAME.to_string(),
413 datatype: ColumnDataType::Boolean.into(),
414 semantic_type: SemanticType::Field.into(),
415 ..Default::default()
416 },
417 ColumnSchema {
418 column_name: SLOW_QUERY_TABLE_TIMESTAMP_COLUMN_NAME.to_string(),
419 datatype: ColumnDataType::TimestampNanosecond.into(),
420 semantic_type: SemanticType::Timestamp.into(),
421 ..Default::default()
422 },
423 ColumnSchema {
424 column_name: SLOW_QUERY_TABLE_PROMQL_RANGE_COLUMN_NAME.to_string(),
425 datatype: ColumnDataType::Uint64.into(),
426 semantic_type: SemanticType::Field.into(),
427 ..Default::default()
428 },
429 ColumnSchema {
430 column_name: SLOW_QUERY_TABLE_PROMQL_STEP_COLUMN_NAME.to_string(),
431 datatype: ColumnDataType::Uint64.into(),
432 semantic_type: SemanticType::Field.into(),
433 ..Default::default()
434 },
435 ColumnSchema {
436 column_name: SLOW_QUERY_TABLE_PROMQL_START_COLUMN_NAME.to_string(),
437 datatype: ColumnDataType::TimestampMillisecond.into(),
438 semantic_type: SemanticType::Field.into(),
439 ..Default::default()
440 },
441 ColumnSchema {
442 column_name: SLOW_QUERY_TABLE_PROMQL_END_COLUMN_NAME.to_string(),
443 datatype: ColumnDataType::TimestampMillisecond.into(),
444 semantic_type: SemanticType::Field.into(),
445 ..Default::default()
446 },
447 ]
448 }
449}
450
451pub struct SlowQueryTimer {
454 start: Instant,
455 stmt: QueryStatement,
456 query_ctx: QueryContextRef,
457 threshold: Option<Duration>,
458 sample_ratio: Option<f64>,
459 tx: Sender<SlowQueryEvent>,
460}
461
462impl SlowQueryTimer {
463 fn send_slow_query_event(&self, elapsed: Duration, threshold: Duration) {
464 let mut slow_query_event = SlowQueryEvent {
465 cost: elapsed.as_millis() as u64,
466 threshold: threshold.as_millis() as u64,
467 query: "".to_string(),
468 query_ctx: self.query_ctx.clone(),
469
470 is_promql: false,
472 promql_range: None,
473 promql_step: None,
474 promql_start: None,
475 promql_end: None,
476 };
477
478 match &self.stmt {
479 QueryStatement::Promql(stmt) => {
480 slow_query_event.is_promql = true;
481 slow_query_event.query = stmt.expr.to_string();
482 slow_query_event.promql_step = Some(stmt.interval.as_millis() as u64);
483
484 let start = stmt
485 .start
486 .duration_since(UNIX_EPOCH)
487 .unwrap_or_default()
488 .as_millis() as i64;
489
490 let end = stmt
491 .end
492 .duration_since(UNIX_EPOCH)
493 .unwrap_or_default()
494 .as_millis() as i64;
495
496 slow_query_event.promql_range = Some((end - start) as u64);
497 slow_query_event.promql_start = Some(start);
498 slow_query_event.promql_end = Some(end);
499 }
500 QueryStatement::Sql(stmt) => {
501 slow_query_event.query = stmt.to_string();
502 }
503 }
504
505 if let Err(e) = self.tx.try_send(slow_query_event) {
507 error!(e; "Failed to send slow query event");
508 }
509 }
510}
511
512impl Drop for SlowQueryTimer {
513 fn drop(&mut self) {
514 if let Some(threshold) = self.threshold {
515 let elapsed = self.start.elapsed();
517 if elapsed > threshold {
518 if let Some(ratio) = self.sample_ratio {
519 if ratio >= 1.0 || random::<f64>() <= ratio {
522 self.send_slow_query_event(elapsed, threshold);
523 }
524 } else {
525 self.send_slow_query_event(elapsed, threshold);
527 }
528 }
529 }
530 }
531}