1use async_trait::async_trait;
16use client::inserter::{Context, Inserter};
17use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_PRIVATE_SCHEMA_NAME};
18use common_error::ext::BoxedError;
19use common_event_recorder::error::{InsertEventsSnafu, Result};
20use common_event_recorder::{build_row_inserts_request, group_events_by_type, Event, EventHandler};
21use snafu::ResultExt;
22
23pub mod region_migration_event;
24
25pub struct EventHandlerImpl {
28 inserter: Box<dyn Inserter>,
29}
30
31impl EventHandlerImpl {
32 pub fn new(inserter: Box<dyn Inserter>) -> Self {
33 Self { inserter }
34 }
35}
36
37#[async_trait]
38impl EventHandler for EventHandlerImpl {
39 async fn handle(&self, events: &[Box<dyn Event>]) -> Result<()> {
40 let event_groups = group_events_by_type(events);
41
42 for (_, events) in event_groups {
43 let requests = build_row_inserts_request(&events)?;
44 self.inserter
45 .insert_rows(
46 &Context {
47 catalog: DEFAULT_CATALOG_NAME,
48 schema: DEFAULT_PRIVATE_SCHEMA_NAME,
49 },
50 requests,
51 )
52 .await
53 .map_err(BoxedError::new)
54 .context(InsertEventsSnafu)?;
55 }
56
57 Ok(())
58 }
59}