1use std::time::Duration;
16
17use async_trait::async_trait;
18use client::inserter::{Context, InsertOptions, Inserter};
19use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_PRIVATE_SCHEMA_NAME};
20use common_error::ext::BoxedError;
21use common_event_recorder::error::{InsertEventsSnafu, Result};
22use common_event_recorder::{
23 build_row_inserts_request, group_events_by_type, Event, EventHandler,
24 DEFAULT_COMPACTION_TIME_WINDOW,
25};
26use common_frontend::slow_query_event::SLOW_QUERY_EVENT_TYPE;
27use datafusion::common::HashMap;
28use operator::statement::{InserterImpl, StatementExecutorRef};
29use snafu::ResultExt;
30
31pub struct EventHandlerImpl {
33 default_inserter: Box<dyn Inserter>,
34 inserters: HashMap<String, Box<dyn Inserter>>,
36}
37
38impl EventHandlerImpl {
39 pub fn new(
41 statement_executor: StatementExecutorRef,
42 slow_query_ttl: Duration,
43 global_ttl: Duration,
44 ) -> Self {
45 Self {
46 inserters: HashMap::from([(
47 SLOW_QUERY_EVENT_TYPE.to_string(),
48 Box::new(InserterImpl::new(
49 statement_executor.clone(),
50 Some(InsertOptions {
51 ttl: slow_query_ttl,
52 append_mode: true,
53 twcs_compaction_time_window: Some(DEFAULT_COMPACTION_TIME_WINDOW),
54 }),
55 )) as _,
56 )]),
57 default_inserter: Box::new(InserterImpl::new(
58 statement_executor.clone(),
59 Some(InsertOptions {
60 ttl: global_ttl,
61 append_mode: true,
62 twcs_compaction_time_window: Some(DEFAULT_COMPACTION_TIME_WINDOW),
63 }),
64 )),
65 }
66 }
67
68 fn inserter(&self, event_type: &str) -> &dyn Inserter {
69 let Some(inserter) = self.inserters.get(event_type) else {
70 return self.default_inserter.as_ref();
71 };
72
73 inserter.as_ref()
74 }
75}
76
77const DEFAULT_CONTEXT: Context = Context {
78 catalog: DEFAULT_CATALOG_NAME,
79 schema: DEFAULT_PRIVATE_SCHEMA_NAME,
80};
81
82#[async_trait]
83impl EventHandler for EventHandlerImpl {
84 async fn handle(&self, events: &[Box<dyn Event>]) -> Result<()> {
85 let event_groups = group_events_by_type(events);
86
87 for (event_type, events) in event_groups {
88 let requests = build_row_inserts_request(&events)?;
89 let inserter = self.inserter(event_type);
90
91 inserter
92 .insert_rows(&DEFAULT_CONTEXT, requests)
93 .await
94 .map_err(BoxedError::new)
95 .context(InsertEventsSnafu)?;
96 }
97
98 Ok(())
99 }
100}