common_event_recorder/
recorder.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::collections::HashMap;
17use std::fmt::Debug;
18use std::sync::Arc;
19use std::time::Duration;
20
21use api::v1::column_data_type_extension::TypeExt;
22use api::v1::value::ValueData;
23use api::v1::{
24    ColumnDataType, ColumnDataTypeExtension, ColumnSchema, JsonTypeExtension, Row,
25    RowInsertRequest, RowInsertRequests, Rows, SemanticType,
26};
27use async_trait::async_trait;
28use backon::{BackoffBuilder, ExponentialBuilder};
29use common_telemetry::{debug, error, info, warn};
30use common_time::timestamp::{TimeUnit, Timestamp};
31use humantime::format_duration;
32use itertools::Itertools;
33use serde::{Deserialize, Serialize};
34use store_api::mito_engine_options::{APPEND_MODE_KEY, TTL_KEY};
35use tokio::sync::mpsc::{channel, Receiver, Sender};
36use tokio::task::JoinHandle;
37use tokio::time::sleep;
38use tokio_util::sync::CancellationToken;
39
40use crate::error::{MismatchedSchemaSnafu, Result};
41
42/// The default table name for storing the events.
43pub const DEFAULT_EVENTS_TABLE_NAME: &str = "events";
44
45/// The column name for the event type.
46pub const EVENTS_TABLE_TYPE_COLUMN_NAME: &str = "type";
47/// The column name for the event payload.
48pub const EVENTS_TABLE_PAYLOAD_COLUMN_NAME: &str = "payload";
49/// The column name for the event timestamp.
50pub const EVENTS_TABLE_TIMESTAMP_COLUMN_NAME: &str = "timestamp";
51
52/// EventRecorderRef is the reference to the event recorder.
53pub type EventRecorderRef = Arc<dyn EventRecorder>;
54
55/// The time interval for flushing batched events to the event handler.
56pub const DEFAULT_FLUSH_INTERVAL_SECONDS: Duration = Duration::from_secs(5);
57/// The default TTL(90 days) for the events table.
58const DEFAULT_EVENTS_TABLE_TTL: Duration = Duration::from_days(90);
59// The capacity of the tokio channel for transmitting events to background processor.
60const DEFAULT_CHANNEL_SIZE: usize = 2048;
61// The size of the buffer for batching events before flushing to event handler.
62const DEFAULT_BUFFER_SIZE: usize = 100;
63// The maximum number of retry attempts when event handler processing fails.
64const DEFAULT_MAX_RETRY_TIMES: u64 = 3;
65
66/// Event trait defines the interface for events that can be recorded and persisted as the system table.
67/// By default, the event will be persisted as the system table with the following schema:
68///
69/// - `type`: the type of the event.
70/// - `payload`: the JSON bytes of the event.
71/// - `timestamp`: the timestamp of the event.
72///
73/// The event can also add the extra schema and row to the event by overriding the `extra_schema` and `extra_row` methods.
74pub trait Event: Send + Sync + Debug {
75    /// Returns the table name of the event.
76    fn table_name(&self) -> &str {
77        DEFAULT_EVENTS_TABLE_NAME
78    }
79
80    /// Returns the type of the event.
81    fn event_type(&self) -> &str;
82
83    /// Returns the timestamp of the event. Default to the current time.
84    fn timestamp(&self) -> Timestamp {
85        Timestamp::current_time(TimeUnit::Nanosecond)
86    }
87
88    /// Returns the JSON bytes of the event as the payload. It will use JSON type to store the payload.
89    fn json_payload(&self) -> Result<String> {
90        Ok("".to_string())
91    }
92
93    /// Add the extra schema to the event with the default schema.
94    fn extra_schema(&self) -> Vec<ColumnSchema> {
95        vec![]
96    }
97
98    /// Add the extra row to the event with the default row.
99    fn extra_row(&self) -> Result<Row> {
100        Ok(Row { values: vec![] })
101    }
102
103    /// Returns the event as any type.
104    fn as_any(&self) -> &dyn Any;
105}
106
107/// Eventable trait defines the interface for objects that can be converted to [Event].
108pub trait Eventable: Send + Sync + Debug {
109    /// Converts the object to an [Event].
110    fn to_event(&self) -> Option<Box<dyn Event>> {
111        None
112    }
113}
114
115/// Groups events by its `event_type`.
116#[allow(clippy::borrowed_box)]
117pub fn group_events_by_type(events: &[Box<dyn Event>]) -> HashMap<&str, Vec<&Box<dyn Event>>> {
118    events
119        .iter()
120        .into_grouping_map_by(|event| event.event_type())
121        .collect()
122}
123
124/// Builds the row inserts request for the events that will be persisted to the events table. The `events` should have the same event type, or it will return an error.
125#[allow(clippy::borrowed_box)]
126pub fn build_row_inserts_request(events: &[&Box<dyn Event>]) -> Result<RowInsertRequests> {
127    // Ensure all the events are the same type.
128    validate_events(events)?;
129
130    // We already validated the events, so it's safe to get the first event to build the schema for the RowInsertRequest.
131    let event = &events[0];
132    let mut schema: Vec<ColumnSchema> = Vec::with_capacity(3 + event.extra_schema().len());
133    schema.extend(vec![
134        ColumnSchema {
135            column_name: EVENTS_TABLE_TYPE_COLUMN_NAME.to_string(),
136            datatype: ColumnDataType::String.into(),
137            semantic_type: SemanticType::Tag.into(),
138            ..Default::default()
139        },
140        ColumnSchema {
141            column_name: EVENTS_TABLE_PAYLOAD_COLUMN_NAME.to_string(),
142            datatype: ColumnDataType::Binary as i32,
143            semantic_type: SemanticType::Field as i32,
144            datatype_extension: Some(ColumnDataTypeExtension {
145                type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
146            }),
147            ..Default::default()
148        },
149        ColumnSchema {
150            column_name: EVENTS_TABLE_TIMESTAMP_COLUMN_NAME.to_string(),
151            datatype: ColumnDataType::TimestampNanosecond.into(),
152            semantic_type: SemanticType::Timestamp.into(),
153            ..Default::default()
154        },
155    ]);
156    schema.extend(event.extra_schema());
157
158    let mut rows: Vec<Row> = Vec::with_capacity(events.len());
159    for event in events {
160        let extra_row = event.extra_row()?;
161        let mut values = Vec::with_capacity(3 + extra_row.values.len());
162        values.extend([
163            ValueData::StringValue(event.event_type().to_string()).into(),
164            ValueData::BinaryValue(event.json_payload()?.into_bytes()).into(),
165            ValueData::TimestampNanosecondValue(event.timestamp().value()).into(),
166        ]);
167        values.extend(extra_row.values);
168        rows.push(Row { values });
169    }
170
171    Ok(RowInsertRequests {
172        inserts: vec![RowInsertRequest {
173            table_name: event.table_name().to_string(),
174            rows: Some(Rows { schema, rows }),
175        }],
176    })
177}
178
179// Ensure the events with the same event type have the same extra schema.
180#[allow(clippy::borrowed_box)]
181fn validate_events(events: &[&Box<dyn Event>]) -> Result<()> {
182    // It's safe to get the first event because the events are already grouped by the event type.
183    let extra_schema = events[0].extra_schema();
184    for event in events {
185        if event.extra_schema() != extra_schema {
186            MismatchedSchemaSnafu {
187                expected: extra_schema.clone(),
188                actual: event.extra_schema(),
189            }
190            .fail()?;
191        }
192    }
193    Ok(())
194}
195
196/// EventRecorder trait defines the interface for recording events.
197pub trait EventRecorder: Send + Sync + Debug + 'static {
198    /// Records an event for persistence and processing by [EventHandler].
199    fn record(&self, event: Box<dyn Event>);
200
201    /// Cancels the event recorder.
202    fn close(&self);
203}
204
205/// EventHandlerOptions is the options for the event handler.
206#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
207pub struct EventHandlerOptions {
208    /// TTL for the events table that will be used to store the events.
209    pub ttl: Duration,
210    /// Append mode for the events table that will be used to store the events.
211    pub append_mode: bool,
212}
213
214impl Default for EventHandlerOptions {
215    fn default() -> Self {
216        Self {
217            ttl: DEFAULT_EVENTS_TABLE_TTL,
218            append_mode: true,
219        }
220    }
221}
222
223impl EventHandlerOptions {
224    /// Converts the options to the hints for the insert operation.
225    pub fn to_hints(&self) -> Vec<(&str, String)> {
226        vec![
227            (TTL_KEY, format_duration(self.ttl).to_string()),
228            (APPEND_MODE_KEY, self.append_mode.to_string()),
229        ]
230    }
231}
232
233/// EventHandler trait defines the interface for how to handle the event.
234#[async_trait]
235pub trait EventHandler: Send + Sync + 'static {
236    /// Processes and handles incoming events. The [DefaultEventHandlerImpl] implementation forwards events to frontend instances for persistence.
237    /// We use `&[Box<dyn Event>]` to avoid consuming the events, so the caller can buffer the events and retry if the handler fails.
238    async fn handle(&self, events: &[Box<dyn Event>]) -> Result<()>;
239}
240
241/// Configuration options for the event recorder.
242#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
243pub struct EventRecorderOptions {
244    /// TTL for the events table that will be used to store the events.
245    #[serde(with = "humantime_serde")]
246    pub ttl: Duration,
247}
248
249impl Default for EventRecorderOptions {
250    fn default() -> Self {
251        Self {
252            ttl: DEFAULT_EVENTS_TABLE_TTL,
253        }
254    }
255}
256
257/// Implementation of [EventRecorder] that records the events and processes them in the background by the [EventHandler].
258#[derive(Debug)]
259pub struct EventRecorderImpl {
260    // The channel to send the events to the background processor.
261    tx: Sender<Box<dyn Event>>,
262    // The cancel token to cancel the background processor.
263    cancel_token: CancellationToken,
264    // The background processor to process the events.
265    handle: Option<JoinHandle<()>>,
266}
267
268impl EventRecorderImpl {
269    pub fn new(event_handler: Box<dyn EventHandler>) -> Self {
270        let (tx, rx) = channel(DEFAULT_CHANNEL_SIZE);
271        let cancel_token = CancellationToken::new();
272
273        let mut recorder = Self {
274            tx,
275            handle: None,
276            cancel_token: cancel_token.clone(),
277        };
278
279        let processor = EventProcessor::new(
280            rx,
281            event_handler,
282            DEFAULT_FLUSH_INTERVAL_SECONDS,
283            DEFAULT_MAX_RETRY_TIMES,
284        )
285        .with_cancel_token(cancel_token);
286
287        // Spawn a background task to process the events.
288        let handle = tokio::spawn(async move {
289            processor.process(DEFAULT_BUFFER_SIZE).await;
290        });
291
292        recorder.handle = Some(handle);
293
294        recorder
295    }
296}
297
298impl EventRecorder for EventRecorderImpl {
299    // Accepts an event and send it to the background handler.
300    fn record(&self, event: Box<dyn Event>) {
301        if let Err(e) = self.tx.try_send(event) {
302            error!("Failed to send event to the background processor: {}", e);
303        }
304    }
305
306    // Closes the event recorder. It will stop the background processor and flush the buffer.
307    fn close(&self) {
308        self.cancel_token.cancel();
309    }
310}
311
312impl Drop for EventRecorderImpl {
313    fn drop(&mut self) {
314        if let Some(handle) = self.handle.take() {
315            handle.abort();
316            info!("Aborted the background processor in event recorder");
317        }
318    }
319}
320
321struct EventProcessor {
322    rx: Receiver<Box<dyn Event>>,
323    event_handler: Box<dyn EventHandler>,
324    max_retry_times: u64,
325    process_interval: Duration,
326    cancel_token: CancellationToken,
327}
328
329impl EventProcessor {
330    fn new(
331        rx: Receiver<Box<dyn Event>>,
332        event_handler: Box<dyn EventHandler>,
333        process_interval: Duration,
334        max_retry_times: u64,
335    ) -> Self {
336        Self {
337            rx,
338            event_handler,
339            max_retry_times,
340            process_interval,
341            cancel_token: CancellationToken::new(),
342        }
343    }
344
345    fn with_cancel_token(mut self, cancel_token: CancellationToken) -> Self {
346        self.cancel_token = cancel_token;
347        self
348    }
349
350    async fn process(mut self, buffer_size: usize) {
351        info!("Start the background processor in event recorder to handle the received events.");
352
353        let mut buffer = Vec::with_capacity(buffer_size);
354        let mut interval = tokio::time::interval(self.process_interval);
355
356        loop {
357            tokio::select! {
358                maybe_event = self.rx.recv() => {
359                    if let Some(maybe_event) = maybe_event {
360                        debug!("Received event: {:?}", maybe_event);
361
362                        if buffer.len() >= buffer_size {
363                            debug!(
364                                "Flushing events to the event handler because the buffer is full with {} events",
365                                buffer.len()
366                            );
367                            self.flush_events_to_handler(&mut buffer).await;
368                        }
369
370                        // Push the event to the buffer, the buffer will be flushed when the interval is triggered or received a closed signal.
371                        buffer.push(maybe_event);
372                    } else {
373                        // When received a closed signal, flush the buffer and exit the loop.
374                        self.flush_events_to_handler(&mut buffer).await;
375                        break;
376                    }
377                }
378                // Cancel the processor through the cancel token.
379                _ = self.cancel_token.cancelled() => {
380                    warn!("Received a cancel signal, flushing the buffer and exiting the loop");
381                    self.flush_events_to_handler(&mut buffer).await;
382                    break;
383                }
384                // When the interval is triggered, flush the buffer and send the events to the event handler.
385                _ = interval.tick() => {
386                    self.flush_events_to_handler(&mut buffer).await;
387                }
388            }
389        }
390    }
391
392    // NOTE: While we implement a retry mechanism for failed event handling, there is no guarantee that all events will be processed successfully.
393    async fn flush_events_to_handler(&self, buffer: &mut Vec<Box<dyn Event>>) {
394        if !buffer.is_empty() {
395            debug!("Flushing {} events to the event handler", buffer.len());
396
397            let mut backoff = ExponentialBuilder::default()
398                .with_min_delay(Duration::from_millis(
399                    DEFAULT_FLUSH_INTERVAL_SECONDS.as_millis() as u64 / self.max_retry_times.max(1),
400                ))
401                .with_max_delay(Duration::from_millis(
402                    DEFAULT_FLUSH_INTERVAL_SECONDS.as_millis() as u64,
403                ))
404                .with_max_times(self.max_retry_times as usize)
405                .build();
406
407            loop {
408                match self.event_handler.handle(buffer).await {
409                    Ok(()) => {
410                        debug!("Successfully handled {} events", buffer.len());
411                        break;
412                    }
413                    Err(e) => {
414                        if let Some(d) = backoff.next() {
415                            warn!(e; "Failed to handle events, retrying...");
416                            sleep(d).await;
417                            continue;
418                        } else {
419                            warn!(
420                                e; "Failed to handle events after {} retries",
421                                self.max_retry_times
422                            );
423                            break;
424                        }
425                    }
426                }
427            }
428        }
429
430        // Clear the buffer to prevent unbounded memory growth, regardless of whether event processing succeeded or failed.
431        buffer.clear();
432    }
433}
434
435#[cfg(test)]
436mod tests {
437    use super::*;
438
439    #[derive(Debug)]
440    struct TestEvent {}
441
442    impl Event for TestEvent {
443        fn event_type(&self) -> &str {
444            "test_event"
445        }
446
447        fn json_payload(&self) -> Result<String> {
448            Ok("{\"procedure_id\": \"1234567890\"}".to_string())
449        }
450
451        fn as_any(&self) -> &dyn Any {
452            self
453        }
454    }
455
456    struct TestEventHandlerImpl {}
457
458    #[async_trait]
459    impl EventHandler for TestEventHandlerImpl {
460        async fn handle(&self, events: &[Box<dyn Event>]) -> Result<()> {
461            let event = events
462                .first()
463                .unwrap()
464                .as_any()
465                .downcast_ref::<TestEvent>()
466                .unwrap();
467            assert_eq!(
468                event.json_payload().unwrap(),
469                "{\"procedure_id\": \"1234567890\"}"
470            );
471            assert_eq!(event.event_type(), "test_event");
472            Ok(())
473        }
474    }
475
476    #[tokio::test]
477    async fn test_event_recorder() {
478        let mut event_recorder = EventRecorderImpl::new(Box::new(TestEventHandlerImpl {}));
479        event_recorder.record(Box::new(TestEvent {}));
480
481        // Sleep for a while to let the event be sent to the event handler.
482        sleep(Duration::from_millis(500)).await;
483
484        // Close the event recorder to flush the buffer.
485        event_recorder.close();
486
487        // Sleep for a while to let the background task process the event.
488        sleep(Duration::from_millis(500)).await;
489
490        if let Some(handle) = event_recorder.handle.take() {
491            assert!(handle.await.is_ok());
492        }
493    }
494
495    struct TestEventHandlerImplShouldPanic {}
496
497    #[async_trait]
498    impl EventHandler for TestEventHandlerImplShouldPanic {
499        async fn handle(&self, events: &[Box<dyn Event>]) -> Result<()> {
500            let event = events
501                .first()
502                .unwrap()
503                .as_any()
504                .downcast_ref::<TestEvent>()
505                .unwrap();
506
507            // Set the incorrect payload and event type to trigger the panic.
508            assert_eq!(
509                event.json_payload().unwrap(),
510                "{\"procedure_id\": \"should_panic\"}"
511            );
512            assert_eq!(event.event_type(), "should_panic");
513            Ok(())
514        }
515    }
516
517    #[tokio::test]
518    async fn test_event_recorder_should_panic() {
519        let mut event_recorder =
520            EventRecorderImpl::new(Box::new(TestEventHandlerImplShouldPanic {}));
521
522        event_recorder.record(Box::new(TestEvent {}));
523
524        // Sleep for a while to let the event be sent to the event handler.
525        sleep(Duration::from_millis(500)).await;
526
527        // Close the event recorder to flush the buffer.
528        event_recorder.close();
529
530        // Sleep for a while to let the background task process the event.
531        sleep(Duration::from_millis(500)).await;
532
533        if let Some(handle) = event_recorder.handle.take() {
534            assert!(handle.await.unwrap_err().is_panic());
535        }
536    }
537
538    #[derive(Debug)]
539    struct TestEventA {}
540
541    impl Event for TestEventA {
542        fn event_type(&self) -> &str {
543            "A"
544        }
545
546        fn as_any(&self) -> &dyn Any {
547            self
548        }
549    }
550
551    #[derive(Debug)]
552    struct TestEventB {}
553
554    impl Event for TestEventB {
555        fn table_name(&self) -> &str {
556            "table_B"
557        }
558
559        fn event_type(&self) -> &str {
560            "B"
561        }
562
563        fn as_any(&self) -> &dyn Any {
564            self
565        }
566    }
567
568    #[derive(Debug)]
569    struct TestEventC {}
570
571    impl Event for TestEventC {
572        fn table_name(&self) -> &str {
573            "table_C"
574        }
575
576        fn event_type(&self) -> &str {
577            "C"
578        }
579
580        fn as_any(&self) -> &dyn Any {
581            self
582        }
583    }
584
585    #[test]
586    fn test_group_events_by_type() {
587        let events: Vec<Box<dyn Event>> = vec![
588            Box::new(TestEventA {}),
589            Box::new(TestEventB {}),
590            Box::new(TestEventA {}),
591            Box::new(TestEventC {}),
592            Box::new(TestEventB {}),
593            Box::new(TestEventC {}),
594            Box::new(TestEventA {}),
595        ];
596
597        let event_groups = group_events_by_type(&events);
598        assert_eq!(event_groups.len(), 3);
599        assert_eq!(event_groups.get("A").unwrap().len(), 3);
600        assert_eq!(event_groups.get("B").unwrap().len(), 2);
601        assert_eq!(event_groups.get("C").unwrap().len(), 2);
602    }
603
604    #[test]
605    fn test_build_row_inserts_request() {
606        let events: Vec<Box<dyn Event>> = vec![
607            Box::new(TestEventA {}),
608            Box::new(TestEventB {}),
609            Box::new(TestEventA {}),
610            Box::new(TestEventC {}),
611            Box::new(TestEventB {}),
612            Box::new(TestEventC {}),
613            Box::new(TestEventA {}),
614        ];
615
616        let event_groups = group_events_by_type(&events);
617        assert_eq!(event_groups.len(), 3);
618        assert_eq!(event_groups.get("A").unwrap().len(), 3);
619        assert_eq!(event_groups.get("B").unwrap().len(), 2);
620        assert_eq!(event_groups.get("C").unwrap().len(), 2);
621
622        for (event_type, events) in event_groups {
623            let row_inserts_request = build_row_inserts_request(&events).unwrap();
624            if event_type == "A" {
625                assert_eq!(row_inserts_request.inserts.len(), 1);
626                assert_eq!(
627                    row_inserts_request.inserts[0].table_name,
628                    DEFAULT_EVENTS_TABLE_NAME
629                );
630                assert_eq!(
631                    row_inserts_request.inserts[0]
632                        .rows
633                        .as_ref()
634                        .unwrap()
635                        .rows
636                        .len(),
637                    3
638                );
639            } else if event_type == "B" {
640                assert_eq!(row_inserts_request.inserts.len(), 1);
641                assert_eq!(row_inserts_request.inserts[0].table_name, "table_B");
642                assert_eq!(
643                    row_inserts_request.inserts[0]
644                        .rows
645                        .as_ref()
646                        .unwrap()
647                        .rows
648                        .len(),
649                    2
650                );
651            } else if event_type == "C" {
652                assert_eq!(row_inserts_request.inserts.len(), 1);
653                assert_eq!(row_inserts_request.inserts[0].table_name, "table_C");
654                assert_eq!(
655                    row_inserts_request.inserts[0]
656                        .rows
657                        .as_ref()
658                        .unwrap()
659                        .rows
660                        .len(),
661                    2
662                );
663            } else {
664                panic!("Unexpected event type: {}", event_type);
665            }
666        }
667    }
668}