meta_srv/
events.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use async_trait::async_trait;
16use client::inserter::{Context, Inserter};
17use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_PRIVATE_SCHEMA_NAME};
18use common_error::ext::BoxedError;
19use common_event_recorder::error::{InsertEventsSnafu, Result};
20use common_event_recorder::{build_row_inserts_request, group_events_by_type, Event, EventHandler};
21use snafu::ResultExt;
22
23pub mod region_migration_event;
24
25/// EventHandlerImpl is the default event handler implementation in metasrv.
26/// It sends the received events to the frontend instances.
27pub struct EventHandlerImpl {
28    inserter: Box<dyn Inserter>,
29}
30
31impl EventHandlerImpl {
32    pub fn new(inserter: Box<dyn Inserter>) -> Self {
33        Self { inserter }
34    }
35}
36
37#[async_trait]
38impl EventHandler for EventHandlerImpl {
39    async fn handle(&self, events: &[Box<dyn Event>]) -> Result<()> {
40        let event_groups = group_events_by_type(events);
41
42        for (_, events) in event_groups {
43            let requests = build_row_inserts_request(&events)?;
44            self.inserter
45                .insert_rows(
46                    &Context {
47                        catalog: DEFAULT_CATALOG_NAME,
48                        schema: DEFAULT_PRIVATE_SCHEMA_NAME,
49                    },
50                    requests,
51                )
52                .await
53                .map_err(BoxedError::new)
54                .context(InsertEventsSnafu)?;
55        }
56
57        Ok(())
58    }
59}