frontend/
events.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::time::Duration;
16
17use async_trait::async_trait;
18use client::inserter::{Context, InsertOptions, Inserter};
19use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_PRIVATE_SCHEMA_NAME};
20use common_error::ext::BoxedError;
21use common_event_recorder::error::{InsertEventsSnafu, Result};
22use common_event_recorder::{
23    build_row_inserts_request, group_events_by_type, Event, EventHandler,
24    DEFAULT_COMPACTION_TIME_WINDOW,
25};
26use common_frontend::slow_query_event::SLOW_QUERY_EVENT_TYPE;
27use datafusion::common::HashMap;
28use operator::statement::{InserterImpl, StatementExecutorRef};
29use snafu::ResultExt;
30
31/// EventHandlerImpl is the default event handler implementation in frontend.
32pub struct EventHandlerImpl {
33    default_inserter: Box<dyn Inserter>,
34    /// The inserters for the event types.
35    inserters: HashMap<String, Box<dyn Inserter>>,
36}
37
38impl EventHandlerImpl {
39    /// Create a new EventHandlerImpl.
40    pub fn new(
41        statement_executor: StatementExecutorRef,
42        slow_query_ttl: Duration,
43        global_ttl: Duration,
44    ) -> Self {
45        Self {
46            inserters: HashMap::from([(
47                SLOW_QUERY_EVENT_TYPE.to_string(),
48                Box::new(InserterImpl::new(
49                    statement_executor.clone(),
50                    Some(InsertOptions {
51                        ttl: slow_query_ttl,
52                        append_mode: true,
53                        twcs_compaction_time_window: Some(DEFAULT_COMPACTION_TIME_WINDOW),
54                    }),
55                )) as _,
56            )]),
57            default_inserter: Box::new(InserterImpl::new(
58                statement_executor.clone(),
59                Some(InsertOptions {
60                    ttl: global_ttl,
61                    append_mode: true,
62                    twcs_compaction_time_window: Some(DEFAULT_COMPACTION_TIME_WINDOW),
63                }),
64            )),
65        }
66    }
67
68    fn inserter(&self, event_type: &str) -> &dyn Inserter {
69        let Some(inserter) = self.inserters.get(event_type) else {
70            return self.default_inserter.as_ref();
71        };
72
73        inserter.as_ref()
74    }
75}
76
77const DEFAULT_CONTEXT: Context = Context {
78    catalog: DEFAULT_CATALOG_NAME,
79    schema: DEFAULT_PRIVATE_SCHEMA_NAME,
80};
81
82#[async_trait]
83impl EventHandler for EventHandlerImpl {
84    async fn handle(&self, events: &[Box<dyn Event>]) -> Result<()> {
85        let event_groups = group_events_by_type(events);
86
87        for (event_type, events) in event_groups {
88            let requests = build_row_inserts_request(&events)?;
89            let inserter = self.inserter(event_type);
90
91            inserter
92                .insert_rows(&DEFAULT_CONTEXT, requests)
93                .await
94                .map_err(BoxedError::new)
95                .context(InsertEventsSnafu)?;
96        }
97
98        Ok(())
99    }
100}