common_event_recorder/
recorder.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::collections::HashMap;
17use std::fmt::Debug;
18use std::sync::Arc;
19use std::time::Duration;
20
21use api::v1::column_data_type_extension::TypeExt;
22use api::v1::value::ValueData;
23use api::v1::{
24    ColumnDataType, ColumnDataTypeExtension, ColumnSchema, JsonTypeExtension, Row,
25    RowInsertRequest, RowInsertRequests, Rows, SemanticType,
26};
27use async_trait::async_trait;
28use backon::{BackoffBuilder, ExponentialBuilder};
29use common_telemetry::{debug, error, info, warn};
30use common_time::timestamp::{TimeUnit, Timestamp};
31use humantime::format_duration;
32use itertools::Itertools;
33use serde::{Deserialize, Serialize};
34use store_api::mito_engine_options::{APPEND_MODE_KEY, TTL_KEY};
35use tokio::sync::mpsc::{Receiver, Sender, channel};
36use tokio::task::JoinHandle;
37use tokio::time::sleep;
38use tokio_util::sync::CancellationToken;
39
40use crate::error::{MismatchedSchemaSnafu, Result};
41
42/// The default table name for storing the events.
43pub const DEFAULT_EVENTS_TABLE_NAME: &str = "events";
44
45/// The column name for the event type.
46pub const EVENTS_TABLE_TYPE_COLUMN_NAME: &str = "type";
47/// The column name for the event payload.
48pub const EVENTS_TABLE_PAYLOAD_COLUMN_NAME: &str = "payload";
49/// The column name for the event timestamp.
50pub const EVENTS_TABLE_TIMESTAMP_COLUMN_NAME: &str = "timestamp";
51
52/// EventRecorderRef is the reference to the event recorder.
53pub type EventRecorderRef = Arc<dyn EventRecorder>;
54
55/// The time interval for flushing batched events to the event handler.
56pub const DEFAULT_FLUSH_INTERVAL_SECONDS: Duration = Duration::from_secs(5);
57/// The default TTL(90 days) for the events table.
58const DEFAULT_EVENTS_TABLE_TTL: Duration = Duration::from_days(90);
59/// The default compaction time window for the events table.
60pub const DEFAULT_COMPACTION_TIME_WINDOW: Duration = Duration::from_days(1);
61// The capacity of the tokio channel for transmitting events to background processor.
62const DEFAULT_CHANNEL_SIZE: usize = 2048;
63// The size of the buffer for batching events before flushing to event handler.
64const DEFAULT_BUFFER_SIZE: usize = 100;
65// The maximum number of retry attempts when event handler processing fails.
66const DEFAULT_MAX_RETRY_TIMES: u64 = 3;
67
68/// Event trait defines the interface for events that can be recorded and persisted as the system table.
69/// By default, the event will be persisted as the system table with the following schema:
70///
71/// - `type`: the type of the event.
72/// - `payload`: the JSON bytes of the event.
73/// - `timestamp`: the timestamp of the event.
74///
75/// The event can also add the extra schema and row to the event by overriding the `extra_schema` and `extra_row` methods.
76pub trait Event: Send + Sync + Debug {
77    /// Returns the table name of the event.
78    fn table_name(&self) -> &str {
79        DEFAULT_EVENTS_TABLE_NAME
80    }
81
82    /// Returns the type of the event.
83    fn event_type(&self) -> &str;
84
85    /// Returns the timestamp of the event. Default to the current time.
86    fn timestamp(&self) -> Timestamp {
87        Timestamp::current_time(TimeUnit::Nanosecond)
88    }
89
90    /// Returns the event payload as a structured JSON value. It will be encoded as JSONB when stored.
91    fn json_payload(&self) -> Result<serde_json::Value> {
92        Ok(serde_json::Value::Null)
93    }
94
95    /// Add the extra schema to the event with the default schema.
96    fn extra_schema(&self) -> Vec<ColumnSchema> {
97        vec![]
98    }
99
100    /// Add the extra rows to the event with the default row.
101    fn extra_rows(&self) -> Result<Vec<Row>> {
102        Ok(vec![Row { values: vec![] }])
103    }
104
105    /// Returns the event as any type.
106    fn as_any(&self) -> &dyn Any;
107}
108
109/// Eventable trait defines the interface for objects that can be converted to [Event].
110pub trait Eventable: Send + Sync + Debug {
111    /// Converts the object to an [Event].
112    fn to_event(&self) -> Option<Box<dyn Event>> {
113        None
114    }
115}
116
117/// Groups events by its `event_type`.
118#[allow(clippy::borrowed_box)]
119pub fn group_events_by_type(events: &[Box<dyn Event>]) -> HashMap<&str, Vec<&Box<dyn Event>>> {
120    events
121        .iter()
122        .into_grouping_map_by(|event| event.event_type())
123        .collect()
124}
125
126/// Builds the row inserts request for the events that will be persisted to the events table. The `events` should have the same event type, or it will return an error.
127#[allow(clippy::borrowed_box)]
128pub fn build_row_inserts_request(events: &[&Box<dyn Event>]) -> Result<RowInsertRequests> {
129    // Ensure all the events are the same type.
130    validate_events(events)?;
131
132    // We already validated the events, so it's safe to get the first event to build the schema for the RowInsertRequest.
133    let event = &events[0];
134    let mut schema: Vec<ColumnSchema> = Vec::with_capacity(3 + event.extra_schema().len());
135    schema.extend(vec![
136        ColumnSchema {
137            column_name: EVENTS_TABLE_TYPE_COLUMN_NAME.to_string(),
138            datatype: ColumnDataType::String.into(),
139            semantic_type: SemanticType::Tag.into(),
140            ..Default::default()
141        },
142        ColumnSchema {
143            column_name: EVENTS_TABLE_PAYLOAD_COLUMN_NAME.to_string(),
144            datatype: ColumnDataType::Binary as i32,
145            semantic_type: SemanticType::Field as i32,
146            datatype_extension: Some(ColumnDataTypeExtension {
147                type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
148            }),
149            ..Default::default()
150        },
151        ColumnSchema {
152            column_name: EVENTS_TABLE_TIMESTAMP_COLUMN_NAME.to_string(),
153            datatype: ColumnDataType::TimestampNanosecond.into(),
154            semantic_type: SemanticType::Timestamp.into(),
155            ..Default::default()
156        },
157    ]);
158    schema.extend(event.extra_schema());
159
160    let mut rows: Vec<Row> = Vec::with_capacity(events.len());
161    for event in events {
162        let extra_rows = event.extra_rows()?;
163        for extra_row in extra_rows {
164            let mut values = Vec::with_capacity(3 + extra_row.values.len());
165            values.extend([
166                ValueData::StringValue(event.event_type().to_string()).into(),
167                ValueData::BinaryValue(jsonb::Value::from(&event.json_payload()?).to_vec()).into(),
168                ValueData::TimestampNanosecondValue(event.timestamp().value()).into(),
169            ]);
170            values.extend(extra_row.values);
171            rows.push(Row { values });
172        }
173    }
174
175    Ok(RowInsertRequests {
176        inserts: vec![RowInsertRequest {
177            table_name: event.table_name().to_string(),
178            rows: Some(Rows { schema, rows }),
179        }],
180    })
181}
182
183// Ensure the events with the same event type have the same extra schema.
184#[allow(clippy::borrowed_box)]
185fn validate_events(events: &[&Box<dyn Event>]) -> Result<()> {
186    // It's safe to get the first event because the events are already grouped by the event type.
187    let extra_schema = events[0].extra_schema();
188    for event in events {
189        if event.extra_schema() != extra_schema {
190            MismatchedSchemaSnafu {
191                expected: extra_schema.clone(),
192                actual: event.extra_schema(),
193            }
194            .fail()?;
195        }
196    }
197    Ok(())
198}
199
200/// EventRecorder trait defines the interface for recording events.
201pub trait EventRecorder: Send + Sync + Debug + 'static {
202    /// Records an event for persistence and processing by [EventHandler].
203    fn record(&self, event: Box<dyn Event>);
204
205    /// Cancels the event recorder.
206    fn close(&self);
207}
208
209/// EventHandlerOptions is the options for the event handler.
210#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
211pub struct EventHandlerOptions {
212    /// TTL for the events table that will be used to store the events.
213    pub ttl: Duration,
214    /// Append mode for the events table that will be used to store the events.
215    pub append_mode: bool,
216}
217
218impl Default for EventHandlerOptions {
219    fn default() -> Self {
220        Self {
221            ttl: DEFAULT_EVENTS_TABLE_TTL,
222            append_mode: true,
223        }
224    }
225}
226
227impl EventHandlerOptions {
228    /// Converts the options to the hints for the insert operation.
229    pub fn to_hints(&self) -> Vec<(&str, String)> {
230        vec![
231            (TTL_KEY, format_duration(self.ttl).to_string()),
232            (APPEND_MODE_KEY, self.append_mode.to_string()),
233        ]
234    }
235}
236
237/// EventHandler trait defines the interface for how to handle the event.
238#[async_trait]
239pub trait EventHandler: Send + Sync + 'static {
240    /// Processes and handles incoming events. The [DefaultEventHandlerImpl] implementation forwards events to frontend instances for persistence.
241    /// We use `&[Box<dyn Event>]` to avoid consuming the events, so the caller can buffer the events and retry if the handler fails.
242    async fn handle(&self, events: &[Box<dyn Event>]) -> Result<()>;
243}
244
245/// Configuration options for the event recorder.
246#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
247pub struct EventRecorderOptions {
248    /// TTL for the events table that will be used to store the events.
249    #[serde(with = "humantime_serde")]
250    pub ttl: Duration,
251}
252
253impl Default for EventRecorderOptions {
254    fn default() -> Self {
255        Self {
256            ttl: DEFAULT_EVENTS_TABLE_TTL,
257        }
258    }
259}
260
261/// Implementation of [EventRecorder] that records the events and processes them in the background by the [EventHandler].
262#[derive(Debug)]
263pub struct EventRecorderImpl {
264    // The channel to send the events to the background processor.
265    tx: Sender<Box<dyn Event>>,
266    // The cancel token to cancel the background processor.
267    cancel_token: CancellationToken,
268    // The background processor to process the events.
269    handle: Option<JoinHandle<()>>,
270}
271
272impl EventRecorderImpl {
273    pub fn new(event_handler: Box<dyn EventHandler>) -> Self {
274        let (tx, rx) = channel(DEFAULT_CHANNEL_SIZE);
275        let cancel_token = CancellationToken::new();
276
277        let mut recorder = Self {
278            tx,
279            handle: None,
280            cancel_token: cancel_token.clone(),
281        };
282
283        let processor = EventProcessor::new(
284            rx,
285            event_handler,
286            DEFAULT_FLUSH_INTERVAL_SECONDS,
287            DEFAULT_MAX_RETRY_TIMES,
288        )
289        .with_cancel_token(cancel_token);
290
291        // Spawn a background task to process the events.
292        let handle = tokio::spawn(async move {
293            processor.process(DEFAULT_BUFFER_SIZE).await;
294        });
295
296        recorder.handle = Some(handle);
297
298        recorder
299    }
300}
301
302impl EventRecorder for EventRecorderImpl {
303    // Accepts an event and send it to the background handler.
304    fn record(&self, event: Box<dyn Event>) {
305        if let Err(e) = self.tx.try_send(event) {
306            error!("Failed to send event to the background processor: {}", e);
307        }
308    }
309
310    // Closes the event recorder. It will stop the background processor and flush the buffer.
311    fn close(&self) {
312        self.cancel_token.cancel();
313    }
314}
315
316impl Drop for EventRecorderImpl {
317    fn drop(&mut self) {
318        if let Some(handle) = self.handle.take() {
319            handle.abort();
320            info!("Aborted the background processor in event recorder");
321        }
322    }
323}
324
325struct EventProcessor {
326    rx: Receiver<Box<dyn Event>>,
327    event_handler: Box<dyn EventHandler>,
328    max_retry_times: u64,
329    process_interval: Duration,
330    cancel_token: CancellationToken,
331}
332
333impl EventProcessor {
334    fn new(
335        rx: Receiver<Box<dyn Event>>,
336        event_handler: Box<dyn EventHandler>,
337        process_interval: Duration,
338        max_retry_times: u64,
339    ) -> Self {
340        Self {
341            rx,
342            event_handler,
343            max_retry_times,
344            process_interval,
345            cancel_token: CancellationToken::new(),
346        }
347    }
348
349    fn with_cancel_token(mut self, cancel_token: CancellationToken) -> Self {
350        self.cancel_token = cancel_token;
351        self
352    }
353
354    async fn process(mut self, buffer_size: usize) {
355        info!("Start the background processor in event recorder to handle the received events.");
356
357        let mut buffer = Vec::with_capacity(buffer_size);
358        let mut interval = tokio::time::interval(self.process_interval);
359
360        loop {
361            tokio::select! {
362                maybe_event = self.rx.recv() => {
363                    if let Some(maybe_event) = maybe_event {
364                        debug!("Received event: {:?}", maybe_event);
365
366                        if buffer.len() >= buffer_size {
367                            debug!(
368                                "Flushing events to the event handler because the buffer is full with {} events",
369                                buffer.len()
370                            );
371                            self.flush_events_to_handler(&mut buffer).await;
372                        }
373
374                        // Push the event to the buffer, the buffer will be flushed when the interval is triggered or received a closed signal.
375                        buffer.push(maybe_event);
376                    } else {
377                        // When received a closed signal, flush the buffer and exit the loop.
378                        self.flush_events_to_handler(&mut buffer).await;
379                        break;
380                    }
381                }
382                // Cancel the processor through the cancel token.
383                _ = self.cancel_token.cancelled() => {
384                    warn!("Received a cancel signal, flushing the buffer and exiting the loop");
385                    self.flush_events_to_handler(&mut buffer).await;
386                    break;
387                }
388                // When the interval is triggered, flush the buffer and send the events to the event handler.
389                _ = interval.tick() => {
390                    self.flush_events_to_handler(&mut buffer).await;
391                }
392            }
393        }
394    }
395
396    // NOTE: While we implement a retry mechanism for failed event handling, there is no guarantee that all events will be processed successfully.
397    async fn flush_events_to_handler(&self, buffer: &mut Vec<Box<dyn Event>>) {
398        if !buffer.is_empty() {
399            debug!("Flushing {} events to the event handler", buffer.len());
400
401            let mut backoff = ExponentialBuilder::default()
402                .with_min_delay(Duration::from_millis(
403                    DEFAULT_FLUSH_INTERVAL_SECONDS.as_millis() as u64 / self.max_retry_times.max(1),
404                ))
405                .with_max_delay(Duration::from_millis(
406                    DEFAULT_FLUSH_INTERVAL_SECONDS.as_millis() as u64,
407                ))
408                .with_max_times(self.max_retry_times as usize)
409                .build();
410
411            loop {
412                match self.event_handler.handle(buffer).await {
413                    Ok(()) => {
414                        debug!("Successfully handled {} events", buffer.len());
415                        break;
416                    }
417                    Err(e) => {
418                        if let Some(d) = backoff.next() {
419                            warn!(e; "Failed to handle events, retrying...");
420                            sleep(d).await;
421                            continue;
422                        } else {
423                            warn!(
424                                e; "Failed to handle events after {} retries",
425                                self.max_retry_times
426                            );
427                            break;
428                        }
429                    }
430                }
431            }
432        }
433
434        // Clear the buffer to prevent unbounded memory growth, regardless of whether event processing succeeded or failed.
435        buffer.clear();
436    }
437}
438
439#[cfg(test)]
440mod tests {
441    use serde_json::json;
442
443    use super::*;
444
445    #[derive(Debug)]
446    struct TestEvent {}
447
448    impl Event for TestEvent {
449        fn event_type(&self) -> &str {
450            "test_event"
451        }
452
453        fn json_payload(&self) -> Result<serde_json::Value> {
454            Ok(json!({"procedure_id": "1234567890"}))
455        }
456
457        fn as_any(&self) -> &dyn Any {
458            self
459        }
460    }
461
462    struct TestEventHandlerImpl {}
463
464    #[async_trait]
465    impl EventHandler for TestEventHandlerImpl {
466        async fn handle(&self, events: &[Box<dyn Event>]) -> Result<()> {
467            let event = events
468                .first()
469                .unwrap()
470                .as_any()
471                .downcast_ref::<TestEvent>()
472                .unwrap();
473            assert_eq!(
474                event.json_payload().unwrap(),
475                json!({"procedure_id": "1234567890"}),
476            );
477            assert_eq!(event.event_type(), "test_event");
478            Ok(())
479        }
480    }
481
482    #[tokio::test]
483    async fn test_event_recorder() {
484        let mut event_recorder = EventRecorderImpl::new(Box::new(TestEventHandlerImpl {}));
485        event_recorder.record(Box::new(TestEvent {}));
486
487        // Sleep for a while to let the event be sent to the event handler.
488        sleep(Duration::from_millis(500)).await;
489
490        // Close the event recorder to flush the buffer.
491        event_recorder.close();
492
493        // Sleep for a while to let the background task process the event.
494        sleep(Duration::from_millis(500)).await;
495
496        if let Some(handle) = event_recorder.handle.take() {
497            assert!(handle.await.is_ok());
498        }
499    }
500
501    struct TestEventHandlerImplShouldPanic {}
502
503    #[async_trait]
504    impl EventHandler for TestEventHandlerImplShouldPanic {
505        async fn handle(&self, events: &[Box<dyn Event>]) -> Result<()> {
506            let event = events
507                .first()
508                .unwrap()
509                .as_any()
510                .downcast_ref::<TestEvent>()
511                .unwrap();
512
513            // Set the incorrect payload and event type to trigger the panic.
514            assert_eq!(
515                event.json_payload().unwrap(),
516                "{\"procedure_id\": \"should_panic\"}"
517            );
518            assert_eq!(event.event_type(), "should_panic");
519            Ok(())
520        }
521    }
522
523    #[tokio::test]
524    async fn test_event_recorder_should_panic() {
525        let mut event_recorder =
526            EventRecorderImpl::new(Box::new(TestEventHandlerImplShouldPanic {}));
527
528        event_recorder.record(Box::new(TestEvent {}));
529
530        // Sleep for a while to let the event be sent to the event handler.
531        sleep(Duration::from_millis(500)).await;
532
533        // Close the event recorder to flush the buffer.
534        event_recorder.close();
535
536        // Sleep for a while to let the background task process the event.
537        sleep(Duration::from_millis(500)).await;
538
539        if let Some(handle) = event_recorder.handle.take() {
540            assert!(handle.await.unwrap_err().is_panic());
541        }
542    }
543
544    #[derive(Debug)]
545    struct TestEventA {}
546
547    impl Event for TestEventA {
548        fn event_type(&self) -> &str {
549            "A"
550        }
551
552        fn as_any(&self) -> &dyn Any {
553            self
554        }
555    }
556
557    #[derive(Debug)]
558    struct TestEventB {}
559
560    impl Event for TestEventB {
561        fn table_name(&self) -> &str {
562            "table_B"
563        }
564
565        fn event_type(&self) -> &str {
566            "B"
567        }
568
569        fn as_any(&self) -> &dyn Any {
570            self
571        }
572    }
573
574    #[derive(Debug)]
575    struct TestEventC {}
576
577    impl Event for TestEventC {
578        fn table_name(&self) -> &str {
579            "table_C"
580        }
581
582        fn event_type(&self) -> &str {
583            "C"
584        }
585
586        fn as_any(&self) -> &dyn Any {
587            self
588        }
589    }
590
591    #[test]
592    fn test_group_events_by_type() {
593        let events: Vec<Box<dyn Event>> = vec![
594            Box::new(TestEventA {}),
595            Box::new(TestEventB {}),
596            Box::new(TestEventA {}),
597            Box::new(TestEventC {}),
598            Box::new(TestEventB {}),
599            Box::new(TestEventC {}),
600            Box::new(TestEventA {}),
601        ];
602
603        let event_groups = group_events_by_type(&events);
604        assert_eq!(event_groups.len(), 3);
605        assert_eq!(event_groups.get("A").unwrap().len(), 3);
606        assert_eq!(event_groups.get("B").unwrap().len(), 2);
607        assert_eq!(event_groups.get("C").unwrap().len(), 2);
608    }
609
610    #[test]
611    fn test_build_row_inserts_request() {
612        let events: Vec<Box<dyn Event>> = vec![
613            Box::new(TestEventA {}),
614            Box::new(TestEventB {}),
615            Box::new(TestEventA {}),
616            Box::new(TestEventC {}),
617            Box::new(TestEventB {}),
618            Box::new(TestEventC {}),
619            Box::new(TestEventA {}),
620        ];
621
622        let event_groups = group_events_by_type(&events);
623        assert_eq!(event_groups.len(), 3);
624        assert_eq!(event_groups.get("A").unwrap().len(), 3);
625        assert_eq!(event_groups.get("B").unwrap().len(), 2);
626        assert_eq!(event_groups.get("C").unwrap().len(), 2);
627
628        for (event_type, events) in event_groups {
629            let row_inserts_request = build_row_inserts_request(&events).unwrap();
630            if event_type == "A" {
631                assert_eq!(row_inserts_request.inserts.len(), 1);
632                assert_eq!(
633                    row_inserts_request.inserts[0].table_name,
634                    DEFAULT_EVENTS_TABLE_NAME
635                );
636                assert_eq!(
637                    row_inserts_request.inserts[0]
638                        .rows
639                        .as_ref()
640                        .unwrap()
641                        .rows
642                        .len(),
643                    3
644                );
645            } else if event_type == "B" {
646                assert_eq!(row_inserts_request.inserts.len(), 1);
647                assert_eq!(row_inserts_request.inserts[0].table_name, "table_B");
648                assert_eq!(
649                    row_inserts_request.inserts[0]
650                        .rows
651                        .as_ref()
652                        .unwrap()
653                        .rows
654                        .len(),
655                    2
656                );
657            } else if event_type == "C" {
658                assert_eq!(row_inserts_request.inserts.len(), 1);
659                assert_eq!(row_inserts_request.inserts[0].table_name, "table_C");
660                assert_eq!(
661                    row_inserts_request.inserts[0]
662                        .rows
663                        .as_ref()
664                        .unwrap()
665                        .rows
666                        .len(),
667                    2
668                );
669            } else {
670                panic!("Unexpected event type: {}", event_type);
671            }
672        }
673    }
674}