common_event_recorder/
recorder.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::collections::HashMap;
17use std::fmt::Debug;
18use std::sync::Arc;
19use std::time::Duration;
20
21use api::v1::column_data_type_extension::TypeExt;
22use api::v1::value::ValueData;
23use api::v1::{
24    ColumnDataType, ColumnDataTypeExtension, ColumnSchema, JsonTypeExtension, Row,
25    RowInsertRequest, RowInsertRequests, Rows, SemanticType,
26};
27use async_trait::async_trait;
28use backon::{BackoffBuilder, ExponentialBuilder};
29use common_telemetry::{debug, error, info, warn};
30use common_time::timestamp::{TimeUnit, Timestamp};
31use humantime::format_duration;
32use itertools::Itertools;
33use serde::{Deserialize, Serialize};
34use store_api::mito_engine_options::{APPEND_MODE_KEY, TTL_KEY};
35use tokio::sync::mpsc::{Receiver, Sender, channel};
36use tokio::task::JoinHandle;
37use tokio::time::sleep;
38use tokio_util::sync::CancellationToken;
39
40use crate::error::{MismatchedSchemaSnafu, Result};
41
42/// The default table name for storing the events.
43pub const DEFAULT_EVENTS_TABLE_NAME: &str = "events";
44
45/// The column name for the event type.
46pub const EVENTS_TABLE_TYPE_COLUMN_NAME: &str = "type";
47/// The column name for the event payload.
48pub const EVENTS_TABLE_PAYLOAD_COLUMN_NAME: &str = "payload";
49/// The column name for the event timestamp.
50pub const EVENTS_TABLE_TIMESTAMP_COLUMN_NAME: &str = "timestamp";
51
52/// EventRecorderRef is the reference to the event recorder.
53pub type EventRecorderRef = Arc<dyn EventRecorder>;
54
55/// The time interval for flushing batched events to the event handler.
56pub const DEFAULT_FLUSH_INTERVAL_SECONDS: Duration = Duration::from_secs(5);
57/// The default TTL(90 days) for the events table.
58const DEFAULT_EVENTS_TABLE_TTL: Duration = Duration::from_days(90);
59/// The default compaction time window for the events table.
60pub const DEFAULT_COMPACTION_TIME_WINDOW: Duration = Duration::from_days(1);
61// The capacity of the tokio channel for transmitting events to background processor.
62const DEFAULT_CHANNEL_SIZE: usize = 2048;
63// The size of the buffer for batching events before flushing to event handler.
64const DEFAULT_BUFFER_SIZE: usize = 100;
65// The maximum number of retry attempts when event handler processing fails.
66const DEFAULT_MAX_RETRY_TIMES: u64 = 3;
67
68/// Event trait defines the interface for events that can be recorded and persisted as the system table.
69/// By default, the event will be persisted as the system table with the following schema:
70///
71/// - `type`: the type of the event.
72/// - `payload`: the JSON bytes of the event.
73/// - `timestamp`: the timestamp of the event.
74///
75/// The event can also add the extra schema and row to the event by overriding the `extra_schema` and `extra_row` methods.
76pub trait Event: Send + Sync + Debug {
77    /// Returns the table name of the event.
78    fn table_name(&self) -> &str {
79        DEFAULT_EVENTS_TABLE_NAME
80    }
81
82    /// Returns the type of the event.
83    fn event_type(&self) -> &str;
84
85    /// Returns the timestamp of the event. Default to the current time.
86    fn timestamp(&self) -> Timestamp {
87        Timestamp::current_time(TimeUnit::Nanosecond)
88    }
89
90    /// Returns the JSON bytes of the event as the payload. It will use JSON type to store the payload.
91    fn json_payload(&self) -> Result<String> {
92        Ok("".to_string())
93    }
94
95    /// Add the extra schema to the event with the default schema.
96    fn extra_schema(&self) -> Vec<ColumnSchema> {
97        vec![]
98    }
99
100    /// Add the extra row to the event with the default row.
101    fn extra_row(&self) -> Result<Row> {
102        Ok(Row { values: vec![] })
103    }
104
105    /// Returns the event as any type.
106    fn as_any(&self) -> &dyn Any;
107}
108
109/// Eventable trait defines the interface for objects that can be converted to [Event].
110pub trait Eventable: Send + Sync + Debug {
111    /// Converts the object to an [Event].
112    fn to_event(&self) -> Option<Box<dyn Event>> {
113        None
114    }
115}
116
117/// Groups events by its `event_type`.
118#[allow(clippy::borrowed_box)]
119pub fn group_events_by_type(events: &[Box<dyn Event>]) -> HashMap<&str, Vec<&Box<dyn Event>>> {
120    events
121        .iter()
122        .into_grouping_map_by(|event| event.event_type())
123        .collect()
124}
125
126/// Builds the row inserts request for the events that will be persisted to the events table. The `events` should have the same event type, or it will return an error.
127#[allow(clippy::borrowed_box)]
128pub fn build_row_inserts_request(events: &[&Box<dyn Event>]) -> Result<RowInsertRequests> {
129    // Ensure all the events are the same type.
130    validate_events(events)?;
131
132    // We already validated the events, so it's safe to get the first event to build the schema for the RowInsertRequest.
133    let event = &events[0];
134    let mut schema: Vec<ColumnSchema> = Vec::with_capacity(3 + event.extra_schema().len());
135    schema.extend(vec![
136        ColumnSchema {
137            column_name: EVENTS_TABLE_TYPE_COLUMN_NAME.to_string(),
138            datatype: ColumnDataType::String.into(),
139            semantic_type: SemanticType::Tag.into(),
140            ..Default::default()
141        },
142        ColumnSchema {
143            column_name: EVENTS_TABLE_PAYLOAD_COLUMN_NAME.to_string(),
144            datatype: ColumnDataType::Binary as i32,
145            semantic_type: SemanticType::Field as i32,
146            datatype_extension: Some(ColumnDataTypeExtension {
147                type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
148            }),
149            ..Default::default()
150        },
151        ColumnSchema {
152            column_name: EVENTS_TABLE_TIMESTAMP_COLUMN_NAME.to_string(),
153            datatype: ColumnDataType::TimestampNanosecond.into(),
154            semantic_type: SemanticType::Timestamp.into(),
155            ..Default::default()
156        },
157    ]);
158    schema.extend(event.extra_schema());
159
160    let mut rows: Vec<Row> = Vec::with_capacity(events.len());
161    for event in events {
162        let extra_row = event.extra_row()?;
163        let mut values = Vec::with_capacity(3 + extra_row.values.len());
164        values.extend([
165            ValueData::StringValue(event.event_type().to_string()).into(),
166            ValueData::BinaryValue(event.json_payload()?.into_bytes()).into(),
167            ValueData::TimestampNanosecondValue(event.timestamp().value()).into(),
168        ]);
169        values.extend(extra_row.values);
170        rows.push(Row { values });
171    }
172
173    Ok(RowInsertRequests {
174        inserts: vec![RowInsertRequest {
175            table_name: event.table_name().to_string(),
176            rows: Some(Rows { schema, rows }),
177        }],
178    })
179}
180
181// Ensure the events with the same event type have the same extra schema.
182#[allow(clippy::borrowed_box)]
183fn validate_events(events: &[&Box<dyn Event>]) -> Result<()> {
184    // It's safe to get the first event because the events are already grouped by the event type.
185    let extra_schema = events[0].extra_schema();
186    for event in events {
187        if event.extra_schema() != extra_schema {
188            MismatchedSchemaSnafu {
189                expected: extra_schema.clone(),
190                actual: event.extra_schema(),
191            }
192            .fail()?;
193        }
194    }
195    Ok(())
196}
197
198/// EventRecorder trait defines the interface for recording events.
199pub trait EventRecorder: Send + Sync + Debug + 'static {
200    /// Records an event for persistence and processing by [EventHandler].
201    fn record(&self, event: Box<dyn Event>);
202
203    /// Cancels the event recorder.
204    fn close(&self);
205}
206
207/// EventHandlerOptions is the options for the event handler.
208#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
209pub struct EventHandlerOptions {
210    /// TTL for the events table that will be used to store the events.
211    pub ttl: Duration,
212    /// Append mode for the events table that will be used to store the events.
213    pub append_mode: bool,
214}
215
216impl Default for EventHandlerOptions {
217    fn default() -> Self {
218        Self {
219            ttl: DEFAULT_EVENTS_TABLE_TTL,
220            append_mode: true,
221        }
222    }
223}
224
225impl EventHandlerOptions {
226    /// Converts the options to the hints for the insert operation.
227    pub fn to_hints(&self) -> Vec<(&str, String)> {
228        vec![
229            (TTL_KEY, format_duration(self.ttl).to_string()),
230            (APPEND_MODE_KEY, self.append_mode.to_string()),
231        ]
232    }
233}
234
235/// EventHandler trait defines the interface for how to handle the event.
236#[async_trait]
237pub trait EventHandler: Send + Sync + 'static {
238    /// Processes and handles incoming events. The [DefaultEventHandlerImpl] implementation forwards events to frontend instances for persistence.
239    /// We use `&[Box<dyn Event>]` to avoid consuming the events, so the caller can buffer the events and retry if the handler fails.
240    async fn handle(&self, events: &[Box<dyn Event>]) -> Result<()>;
241}
242
243/// Configuration options for the event recorder.
244#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
245pub struct EventRecorderOptions {
246    /// TTL for the events table that will be used to store the events.
247    #[serde(with = "humantime_serde")]
248    pub ttl: Duration,
249}
250
251impl Default for EventRecorderOptions {
252    fn default() -> Self {
253        Self {
254            ttl: DEFAULT_EVENTS_TABLE_TTL,
255        }
256    }
257}
258
259/// Implementation of [EventRecorder] that records the events and processes them in the background by the [EventHandler].
260#[derive(Debug)]
261pub struct EventRecorderImpl {
262    // The channel to send the events to the background processor.
263    tx: Sender<Box<dyn Event>>,
264    // The cancel token to cancel the background processor.
265    cancel_token: CancellationToken,
266    // The background processor to process the events.
267    handle: Option<JoinHandle<()>>,
268}
269
270impl EventRecorderImpl {
271    pub fn new(event_handler: Box<dyn EventHandler>) -> Self {
272        let (tx, rx) = channel(DEFAULT_CHANNEL_SIZE);
273        let cancel_token = CancellationToken::new();
274
275        let mut recorder = Self {
276            tx,
277            handle: None,
278            cancel_token: cancel_token.clone(),
279        };
280
281        let processor = EventProcessor::new(
282            rx,
283            event_handler,
284            DEFAULT_FLUSH_INTERVAL_SECONDS,
285            DEFAULT_MAX_RETRY_TIMES,
286        )
287        .with_cancel_token(cancel_token);
288
289        // Spawn a background task to process the events.
290        let handle = tokio::spawn(async move {
291            processor.process(DEFAULT_BUFFER_SIZE).await;
292        });
293
294        recorder.handle = Some(handle);
295
296        recorder
297    }
298}
299
300impl EventRecorder for EventRecorderImpl {
301    // Accepts an event and send it to the background handler.
302    fn record(&self, event: Box<dyn Event>) {
303        if let Err(e) = self.tx.try_send(event) {
304            error!("Failed to send event to the background processor: {}", e);
305        }
306    }
307
308    // Closes the event recorder. It will stop the background processor and flush the buffer.
309    fn close(&self) {
310        self.cancel_token.cancel();
311    }
312}
313
314impl Drop for EventRecorderImpl {
315    fn drop(&mut self) {
316        if let Some(handle) = self.handle.take() {
317            handle.abort();
318            info!("Aborted the background processor in event recorder");
319        }
320    }
321}
322
323struct EventProcessor {
324    rx: Receiver<Box<dyn Event>>,
325    event_handler: Box<dyn EventHandler>,
326    max_retry_times: u64,
327    process_interval: Duration,
328    cancel_token: CancellationToken,
329}
330
331impl EventProcessor {
332    fn new(
333        rx: Receiver<Box<dyn Event>>,
334        event_handler: Box<dyn EventHandler>,
335        process_interval: Duration,
336        max_retry_times: u64,
337    ) -> Self {
338        Self {
339            rx,
340            event_handler,
341            max_retry_times,
342            process_interval,
343            cancel_token: CancellationToken::new(),
344        }
345    }
346
347    fn with_cancel_token(mut self, cancel_token: CancellationToken) -> Self {
348        self.cancel_token = cancel_token;
349        self
350    }
351
352    async fn process(mut self, buffer_size: usize) {
353        info!("Start the background processor in event recorder to handle the received events.");
354
355        let mut buffer = Vec::with_capacity(buffer_size);
356        let mut interval = tokio::time::interval(self.process_interval);
357
358        loop {
359            tokio::select! {
360                maybe_event = self.rx.recv() => {
361                    if let Some(maybe_event) = maybe_event {
362                        debug!("Received event: {:?}", maybe_event);
363
364                        if buffer.len() >= buffer_size {
365                            debug!(
366                                "Flushing events to the event handler because the buffer is full with {} events",
367                                buffer.len()
368                            );
369                            self.flush_events_to_handler(&mut buffer).await;
370                        }
371
372                        // Push the event to the buffer, the buffer will be flushed when the interval is triggered or received a closed signal.
373                        buffer.push(maybe_event);
374                    } else {
375                        // When received a closed signal, flush the buffer and exit the loop.
376                        self.flush_events_to_handler(&mut buffer).await;
377                        break;
378                    }
379                }
380                // Cancel the processor through the cancel token.
381                _ = self.cancel_token.cancelled() => {
382                    warn!("Received a cancel signal, flushing the buffer and exiting the loop");
383                    self.flush_events_to_handler(&mut buffer).await;
384                    break;
385                }
386                // When the interval is triggered, flush the buffer and send the events to the event handler.
387                _ = interval.tick() => {
388                    self.flush_events_to_handler(&mut buffer).await;
389                }
390            }
391        }
392    }
393
394    // NOTE: While we implement a retry mechanism for failed event handling, there is no guarantee that all events will be processed successfully.
395    async fn flush_events_to_handler(&self, buffer: &mut Vec<Box<dyn Event>>) {
396        if !buffer.is_empty() {
397            debug!("Flushing {} events to the event handler", buffer.len());
398
399            let mut backoff = ExponentialBuilder::default()
400                .with_min_delay(Duration::from_millis(
401                    DEFAULT_FLUSH_INTERVAL_SECONDS.as_millis() as u64 / self.max_retry_times.max(1),
402                ))
403                .with_max_delay(Duration::from_millis(
404                    DEFAULT_FLUSH_INTERVAL_SECONDS.as_millis() as u64,
405                ))
406                .with_max_times(self.max_retry_times as usize)
407                .build();
408
409            loop {
410                match self.event_handler.handle(buffer).await {
411                    Ok(()) => {
412                        debug!("Successfully handled {} events", buffer.len());
413                        break;
414                    }
415                    Err(e) => {
416                        if let Some(d) = backoff.next() {
417                            warn!(e; "Failed to handle events, retrying...");
418                            sleep(d).await;
419                            continue;
420                        } else {
421                            warn!(
422                                e; "Failed to handle events after {} retries",
423                                self.max_retry_times
424                            );
425                            break;
426                        }
427                    }
428                }
429            }
430        }
431
432        // Clear the buffer to prevent unbounded memory growth, regardless of whether event processing succeeded or failed.
433        buffer.clear();
434    }
435}
436
437#[cfg(test)]
438mod tests {
439    use super::*;
440
441    #[derive(Debug)]
442    struct TestEvent {}
443
444    impl Event for TestEvent {
445        fn event_type(&self) -> &str {
446            "test_event"
447        }
448
449        fn json_payload(&self) -> Result<String> {
450            Ok("{\"procedure_id\": \"1234567890\"}".to_string())
451        }
452
453        fn as_any(&self) -> &dyn Any {
454            self
455        }
456    }
457
458    struct TestEventHandlerImpl {}
459
460    #[async_trait]
461    impl EventHandler for TestEventHandlerImpl {
462        async fn handle(&self, events: &[Box<dyn Event>]) -> Result<()> {
463            let event = events
464                .first()
465                .unwrap()
466                .as_any()
467                .downcast_ref::<TestEvent>()
468                .unwrap();
469            assert_eq!(
470                event.json_payload().unwrap(),
471                "{\"procedure_id\": \"1234567890\"}"
472            );
473            assert_eq!(event.event_type(), "test_event");
474            Ok(())
475        }
476    }
477
478    #[tokio::test]
479    async fn test_event_recorder() {
480        let mut event_recorder = EventRecorderImpl::new(Box::new(TestEventHandlerImpl {}));
481        event_recorder.record(Box::new(TestEvent {}));
482
483        // Sleep for a while to let the event be sent to the event handler.
484        sleep(Duration::from_millis(500)).await;
485
486        // Close the event recorder to flush the buffer.
487        event_recorder.close();
488
489        // Sleep for a while to let the background task process the event.
490        sleep(Duration::from_millis(500)).await;
491
492        if let Some(handle) = event_recorder.handle.take() {
493            assert!(handle.await.is_ok());
494        }
495    }
496
497    struct TestEventHandlerImplShouldPanic {}
498
499    #[async_trait]
500    impl EventHandler for TestEventHandlerImplShouldPanic {
501        async fn handle(&self, events: &[Box<dyn Event>]) -> Result<()> {
502            let event = events
503                .first()
504                .unwrap()
505                .as_any()
506                .downcast_ref::<TestEvent>()
507                .unwrap();
508
509            // Set the incorrect payload and event type to trigger the panic.
510            assert_eq!(
511                event.json_payload().unwrap(),
512                "{\"procedure_id\": \"should_panic\"}"
513            );
514            assert_eq!(event.event_type(), "should_panic");
515            Ok(())
516        }
517    }
518
519    #[tokio::test]
520    async fn test_event_recorder_should_panic() {
521        let mut event_recorder =
522            EventRecorderImpl::new(Box::new(TestEventHandlerImplShouldPanic {}));
523
524        event_recorder.record(Box::new(TestEvent {}));
525
526        // Sleep for a while to let the event be sent to the event handler.
527        sleep(Duration::from_millis(500)).await;
528
529        // Close the event recorder to flush the buffer.
530        event_recorder.close();
531
532        // Sleep for a while to let the background task process the event.
533        sleep(Duration::from_millis(500)).await;
534
535        if let Some(handle) = event_recorder.handle.take() {
536            assert!(handle.await.unwrap_err().is_panic());
537        }
538    }
539
540    #[derive(Debug)]
541    struct TestEventA {}
542
543    impl Event for TestEventA {
544        fn event_type(&self) -> &str {
545            "A"
546        }
547
548        fn as_any(&self) -> &dyn Any {
549            self
550        }
551    }
552
553    #[derive(Debug)]
554    struct TestEventB {}
555
556    impl Event for TestEventB {
557        fn table_name(&self) -> &str {
558            "table_B"
559        }
560
561        fn event_type(&self) -> &str {
562            "B"
563        }
564
565        fn as_any(&self) -> &dyn Any {
566            self
567        }
568    }
569
570    #[derive(Debug)]
571    struct TestEventC {}
572
573    impl Event for TestEventC {
574        fn table_name(&self) -> &str {
575            "table_C"
576        }
577
578        fn event_type(&self) -> &str {
579            "C"
580        }
581
582        fn as_any(&self) -> &dyn Any {
583            self
584        }
585    }
586
587    #[test]
588    fn test_group_events_by_type() {
589        let events: Vec<Box<dyn Event>> = vec![
590            Box::new(TestEventA {}),
591            Box::new(TestEventB {}),
592            Box::new(TestEventA {}),
593            Box::new(TestEventC {}),
594            Box::new(TestEventB {}),
595            Box::new(TestEventC {}),
596            Box::new(TestEventA {}),
597        ];
598
599        let event_groups = group_events_by_type(&events);
600        assert_eq!(event_groups.len(), 3);
601        assert_eq!(event_groups.get("A").unwrap().len(), 3);
602        assert_eq!(event_groups.get("B").unwrap().len(), 2);
603        assert_eq!(event_groups.get("C").unwrap().len(), 2);
604    }
605
606    #[test]
607    fn test_build_row_inserts_request() {
608        let events: Vec<Box<dyn Event>> = vec![
609            Box::new(TestEventA {}),
610            Box::new(TestEventB {}),
611            Box::new(TestEventA {}),
612            Box::new(TestEventC {}),
613            Box::new(TestEventB {}),
614            Box::new(TestEventC {}),
615            Box::new(TestEventA {}),
616        ];
617
618        let event_groups = group_events_by_type(&events);
619        assert_eq!(event_groups.len(), 3);
620        assert_eq!(event_groups.get("A").unwrap().len(), 3);
621        assert_eq!(event_groups.get("B").unwrap().len(), 2);
622        assert_eq!(event_groups.get("C").unwrap().len(), 2);
623
624        for (event_type, events) in event_groups {
625            let row_inserts_request = build_row_inserts_request(&events).unwrap();
626            if event_type == "A" {
627                assert_eq!(row_inserts_request.inserts.len(), 1);
628                assert_eq!(
629                    row_inserts_request.inserts[0].table_name,
630                    DEFAULT_EVENTS_TABLE_NAME
631                );
632                assert_eq!(
633                    row_inserts_request.inserts[0]
634                        .rows
635                        .as_ref()
636                        .unwrap()
637                        .rows
638                        .len(),
639                    3
640                );
641            } else if event_type == "B" {
642                assert_eq!(row_inserts_request.inserts.len(), 1);
643                assert_eq!(row_inserts_request.inserts[0].table_name, "table_B");
644                assert_eq!(
645                    row_inserts_request.inserts[0]
646                        .rows
647                        .as_ref()
648                        .unwrap()
649                        .rows
650                        .len(),
651                    2
652                );
653            } else if event_type == "C" {
654                assert_eq!(row_inserts_request.inserts.len(), 1);
655                assert_eq!(row_inserts_request.inserts[0].table_name, "table_C");
656                assert_eq!(
657                    row_inserts_request.inserts[0]
658                        .rows
659                        .as_ref()
660                        .unwrap()
661                        .rows
662                        .len(),
663                    2
664                );
665            } else {
666                panic!("Unexpected event type: {}", event_type);
667            }
668        }
669    }
670}