1use std::any::Any;
16use std::collections::HashMap;
17use std::fmt::Debug;
18use std::sync::Arc;
19use std::time::Duration;
20
21use api::v1::column_data_type_extension::TypeExt;
22use api::v1::value::ValueData;
23use api::v1::{
24 ColumnDataType, ColumnDataTypeExtension, ColumnSchema, JsonTypeExtension, Row,
25 RowInsertRequest, RowInsertRequests, Rows, SemanticType,
26};
27use async_trait::async_trait;
28use backon::{BackoffBuilder, ExponentialBuilder};
29use common_telemetry::{debug, error, info, warn};
30use common_time::timestamp::{TimeUnit, Timestamp};
31use humantime::format_duration;
32use itertools::Itertools;
33use serde::{Deserialize, Serialize};
34use store_api::mito_engine_options::{APPEND_MODE_KEY, TTL_KEY};
35use tokio::sync::mpsc::{Receiver, Sender, channel};
36use tokio::task::JoinHandle;
37use tokio::time::sleep;
38use tokio_util::sync::CancellationToken;
39
40use crate::error::{MismatchedSchemaSnafu, Result};
41
42pub const DEFAULT_EVENTS_TABLE_NAME: &str = "events";
44
45pub const EVENTS_TABLE_TYPE_COLUMN_NAME: &str = "type";
47pub const EVENTS_TABLE_PAYLOAD_COLUMN_NAME: &str = "payload";
49pub const EVENTS_TABLE_TIMESTAMP_COLUMN_NAME: &str = "timestamp";
51
52pub type EventRecorderRef = Arc<dyn EventRecorder>;
54
55pub const DEFAULT_FLUSH_INTERVAL_SECONDS: Duration = Duration::from_secs(5);
57const DEFAULT_EVENTS_TABLE_TTL: Duration = Duration::from_days(90);
59pub const DEFAULT_COMPACTION_TIME_WINDOW: Duration = Duration::from_days(1);
61const DEFAULT_CHANNEL_SIZE: usize = 2048;
63const DEFAULT_BUFFER_SIZE: usize = 100;
65const DEFAULT_MAX_RETRY_TIMES: u64 = 3;
67
68pub trait Event: Send + Sync + Debug {
77 fn table_name(&self) -> &str {
79 DEFAULT_EVENTS_TABLE_NAME
80 }
81
82 fn event_type(&self) -> &str;
84
85 fn timestamp(&self) -> Timestamp {
87 Timestamp::current_time(TimeUnit::Nanosecond)
88 }
89
90 fn json_payload(&self) -> Result<serde_json::Value> {
92 Ok(serde_json::Value::Null)
93 }
94
95 fn extra_schema(&self) -> Vec<ColumnSchema> {
97 vec![]
98 }
99
100 fn extra_rows(&self) -> Result<Vec<Row>> {
102 Ok(vec![Row { values: vec![] }])
103 }
104
105 fn as_any(&self) -> &dyn Any;
107}
108
109pub trait Eventable: Send + Sync + Debug {
111 fn to_event(&self) -> Option<Box<dyn Event>> {
113 None
114 }
115}
116
117#[allow(clippy::borrowed_box)]
119pub fn group_events_by_type(events: &[Box<dyn Event>]) -> HashMap<&str, Vec<&Box<dyn Event>>> {
120 events
121 .iter()
122 .into_grouping_map_by(|event| event.event_type())
123 .collect()
124}
125
126#[allow(clippy::borrowed_box)]
128pub fn build_row_inserts_request(events: &[&Box<dyn Event>]) -> Result<RowInsertRequests> {
129 validate_events(events)?;
131
132 let event = &events[0];
134 let mut schema: Vec<ColumnSchema> = Vec::with_capacity(3 + event.extra_schema().len());
135 schema.extend(vec![
136 ColumnSchema {
137 column_name: EVENTS_TABLE_TYPE_COLUMN_NAME.to_string(),
138 datatype: ColumnDataType::String.into(),
139 semantic_type: SemanticType::Tag.into(),
140 ..Default::default()
141 },
142 ColumnSchema {
143 column_name: EVENTS_TABLE_PAYLOAD_COLUMN_NAME.to_string(),
144 datatype: ColumnDataType::Binary as i32,
145 semantic_type: SemanticType::Field as i32,
146 datatype_extension: Some(ColumnDataTypeExtension {
147 type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
148 }),
149 ..Default::default()
150 },
151 ColumnSchema {
152 column_name: EVENTS_TABLE_TIMESTAMP_COLUMN_NAME.to_string(),
153 datatype: ColumnDataType::TimestampNanosecond.into(),
154 semantic_type: SemanticType::Timestamp.into(),
155 ..Default::default()
156 },
157 ]);
158 schema.extend(event.extra_schema());
159
160 let mut rows: Vec<Row> = Vec::with_capacity(events.len());
161 for event in events {
162 let extra_rows = event.extra_rows()?;
163 for extra_row in extra_rows {
164 let mut values = Vec::with_capacity(3 + extra_row.values.len());
165 values.extend([
166 ValueData::StringValue(event.event_type().to_string()).into(),
167 ValueData::BinaryValue(jsonb::Value::from(&event.json_payload()?).to_vec()).into(),
168 ValueData::TimestampNanosecondValue(event.timestamp().value()).into(),
169 ]);
170 values.extend(extra_row.values);
171 rows.push(Row { values });
172 }
173 }
174
175 Ok(RowInsertRequests {
176 inserts: vec![RowInsertRequest {
177 table_name: event.table_name().to_string(),
178 rows: Some(Rows { schema, rows }),
179 }],
180 })
181}
182
183#[allow(clippy::borrowed_box)]
185fn validate_events(events: &[&Box<dyn Event>]) -> Result<()> {
186 let extra_schema = events[0].extra_schema();
188 for event in events {
189 if event.extra_schema() != extra_schema {
190 MismatchedSchemaSnafu {
191 expected: extra_schema.clone(),
192 actual: event.extra_schema(),
193 }
194 .fail()?;
195 }
196 }
197 Ok(())
198}
199
200pub trait EventRecorder: Send + Sync + Debug + 'static {
202 fn record(&self, event: Box<dyn Event>);
204
205 fn close(&self);
207}
208
209#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
211pub struct EventHandlerOptions {
212 pub ttl: Duration,
214 pub append_mode: bool,
216}
217
218impl Default for EventHandlerOptions {
219 fn default() -> Self {
220 Self {
221 ttl: DEFAULT_EVENTS_TABLE_TTL,
222 append_mode: true,
223 }
224 }
225}
226
227impl EventHandlerOptions {
228 pub fn to_hints(&self) -> Vec<(&str, String)> {
230 vec![
231 (TTL_KEY, format_duration(self.ttl).to_string()),
232 (APPEND_MODE_KEY, self.append_mode.to_string()),
233 ]
234 }
235}
236
237#[async_trait]
239pub trait EventHandler: Send + Sync + 'static {
240 async fn handle(&self, events: &[Box<dyn Event>]) -> Result<()>;
243}
244
245#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
247pub struct EventRecorderOptions {
248 #[serde(with = "humantime_serde")]
250 pub ttl: Duration,
251}
252
253impl Default for EventRecorderOptions {
254 fn default() -> Self {
255 Self {
256 ttl: DEFAULT_EVENTS_TABLE_TTL,
257 }
258 }
259}
260
261#[derive(Debug)]
263pub struct EventRecorderImpl {
264 tx: Sender<Box<dyn Event>>,
266 cancel_token: CancellationToken,
268 handle: Option<JoinHandle<()>>,
270}
271
272impl EventRecorderImpl {
273 pub fn new(event_handler: Box<dyn EventHandler>) -> Self {
274 let (tx, rx) = channel(DEFAULT_CHANNEL_SIZE);
275 let cancel_token = CancellationToken::new();
276
277 let mut recorder = Self {
278 tx,
279 handle: None,
280 cancel_token: cancel_token.clone(),
281 };
282
283 let processor = EventProcessor::new(
284 rx,
285 event_handler,
286 DEFAULT_FLUSH_INTERVAL_SECONDS,
287 DEFAULT_MAX_RETRY_TIMES,
288 )
289 .with_cancel_token(cancel_token);
290
291 let handle = tokio::spawn(async move {
293 processor.process(DEFAULT_BUFFER_SIZE).await;
294 });
295
296 recorder.handle = Some(handle);
297
298 recorder
299 }
300}
301
302impl EventRecorder for EventRecorderImpl {
303 fn record(&self, event: Box<dyn Event>) {
305 if let Err(e) = self.tx.try_send(event) {
306 error!("Failed to send event to the background processor: {}", e);
307 }
308 }
309
310 fn close(&self) {
312 self.cancel_token.cancel();
313 }
314}
315
316impl Drop for EventRecorderImpl {
317 fn drop(&mut self) {
318 if let Some(handle) = self.handle.take() {
319 handle.abort();
320 info!("Aborted the background processor in event recorder");
321 }
322 }
323}
324
325struct EventProcessor {
326 rx: Receiver<Box<dyn Event>>,
327 event_handler: Box<dyn EventHandler>,
328 max_retry_times: u64,
329 process_interval: Duration,
330 cancel_token: CancellationToken,
331}
332
333impl EventProcessor {
334 fn new(
335 rx: Receiver<Box<dyn Event>>,
336 event_handler: Box<dyn EventHandler>,
337 process_interval: Duration,
338 max_retry_times: u64,
339 ) -> Self {
340 Self {
341 rx,
342 event_handler,
343 max_retry_times,
344 process_interval,
345 cancel_token: CancellationToken::new(),
346 }
347 }
348
349 fn with_cancel_token(mut self, cancel_token: CancellationToken) -> Self {
350 self.cancel_token = cancel_token;
351 self
352 }
353
354 async fn process(mut self, buffer_size: usize) {
355 info!("Start the background processor in event recorder to handle the received events.");
356
357 let mut buffer = Vec::with_capacity(buffer_size);
358 let mut interval = tokio::time::interval(self.process_interval);
359
360 loop {
361 tokio::select! {
362 maybe_event = self.rx.recv() => {
363 if let Some(maybe_event) = maybe_event {
364 debug!("Received event: {:?}", maybe_event);
365
366 if buffer.len() >= buffer_size {
367 debug!(
368 "Flushing events to the event handler because the buffer is full with {} events",
369 buffer.len()
370 );
371 self.flush_events_to_handler(&mut buffer).await;
372 }
373
374 buffer.push(maybe_event);
376 } else {
377 self.flush_events_to_handler(&mut buffer).await;
379 break;
380 }
381 }
382 _ = self.cancel_token.cancelled() => {
384 warn!("Received a cancel signal, flushing the buffer and exiting the loop");
385 self.flush_events_to_handler(&mut buffer).await;
386 break;
387 }
388 _ = interval.tick() => {
390 self.flush_events_to_handler(&mut buffer).await;
391 }
392 }
393 }
394 }
395
396 async fn flush_events_to_handler(&self, buffer: &mut Vec<Box<dyn Event>>) {
398 if !buffer.is_empty() {
399 debug!("Flushing {} events to the event handler", buffer.len());
400
401 let mut backoff = ExponentialBuilder::default()
402 .with_min_delay(Duration::from_millis(
403 DEFAULT_FLUSH_INTERVAL_SECONDS.as_millis() as u64 / self.max_retry_times.max(1),
404 ))
405 .with_max_delay(Duration::from_millis(
406 DEFAULT_FLUSH_INTERVAL_SECONDS.as_millis() as u64,
407 ))
408 .with_max_times(self.max_retry_times as usize)
409 .build();
410
411 loop {
412 match self.event_handler.handle(buffer).await {
413 Ok(()) => {
414 debug!("Successfully handled {} events", buffer.len());
415 break;
416 }
417 Err(e) => {
418 if let Some(d) = backoff.next() {
419 warn!(e; "Failed to handle events, retrying...");
420 sleep(d).await;
421 continue;
422 } else {
423 warn!(
424 e; "Failed to handle events after {} retries",
425 self.max_retry_times
426 );
427 break;
428 }
429 }
430 }
431 }
432 }
433
434 buffer.clear();
436 }
437}
438
439#[cfg(test)]
440mod tests {
441 use serde_json::json;
442
443 use super::*;
444
445 #[derive(Debug)]
446 struct TestEvent {}
447
448 impl Event for TestEvent {
449 fn event_type(&self) -> &str {
450 "test_event"
451 }
452
453 fn json_payload(&self) -> Result<serde_json::Value> {
454 Ok(json!({"procedure_id": "1234567890"}))
455 }
456
457 fn as_any(&self) -> &dyn Any {
458 self
459 }
460 }
461
462 struct TestEventHandlerImpl {}
463
464 #[async_trait]
465 impl EventHandler for TestEventHandlerImpl {
466 async fn handle(&self, events: &[Box<dyn Event>]) -> Result<()> {
467 let event = events
468 .first()
469 .unwrap()
470 .as_any()
471 .downcast_ref::<TestEvent>()
472 .unwrap();
473 assert_eq!(
474 event.json_payload().unwrap(),
475 json!({"procedure_id": "1234567890"}),
476 );
477 assert_eq!(event.event_type(), "test_event");
478 Ok(())
479 }
480 }
481
482 #[tokio::test]
483 async fn test_event_recorder() {
484 let mut event_recorder = EventRecorderImpl::new(Box::new(TestEventHandlerImpl {}));
485 event_recorder.record(Box::new(TestEvent {}));
486
487 sleep(Duration::from_millis(500)).await;
489
490 event_recorder.close();
492
493 sleep(Duration::from_millis(500)).await;
495
496 if let Some(handle) = event_recorder.handle.take() {
497 assert!(handle.await.is_ok());
498 }
499 }
500
501 struct TestEventHandlerImplShouldPanic {}
502
503 #[async_trait]
504 impl EventHandler for TestEventHandlerImplShouldPanic {
505 async fn handle(&self, events: &[Box<dyn Event>]) -> Result<()> {
506 let event = events
507 .first()
508 .unwrap()
509 .as_any()
510 .downcast_ref::<TestEvent>()
511 .unwrap();
512
513 assert_eq!(
515 event.json_payload().unwrap(),
516 "{\"procedure_id\": \"should_panic\"}"
517 );
518 assert_eq!(event.event_type(), "should_panic");
519 Ok(())
520 }
521 }
522
523 #[tokio::test]
524 async fn test_event_recorder_should_panic() {
525 let mut event_recorder =
526 EventRecorderImpl::new(Box::new(TestEventHandlerImplShouldPanic {}));
527
528 event_recorder.record(Box::new(TestEvent {}));
529
530 sleep(Duration::from_millis(500)).await;
532
533 event_recorder.close();
535
536 sleep(Duration::from_millis(500)).await;
538
539 if let Some(handle) = event_recorder.handle.take() {
540 assert!(handle.await.unwrap_err().is_panic());
541 }
542 }
543
544 #[derive(Debug)]
545 struct TestEventA {}
546
547 impl Event for TestEventA {
548 fn event_type(&self) -> &str {
549 "A"
550 }
551
552 fn as_any(&self) -> &dyn Any {
553 self
554 }
555 }
556
557 #[derive(Debug)]
558 struct TestEventB {}
559
560 impl Event for TestEventB {
561 fn table_name(&self) -> &str {
562 "table_B"
563 }
564
565 fn event_type(&self) -> &str {
566 "B"
567 }
568
569 fn as_any(&self) -> &dyn Any {
570 self
571 }
572 }
573
574 #[derive(Debug)]
575 struct TestEventC {}
576
577 impl Event for TestEventC {
578 fn table_name(&self) -> &str {
579 "table_C"
580 }
581
582 fn event_type(&self) -> &str {
583 "C"
584 }
585
586 fn as_any(&self) -> &dyn Any {
587 self
588 }
589 }
590
591 #[test]
592 fn test_group_events_by_type() {
593 let events: Vec<Box<dyn Event>> = vec![
594 Box::new(TestEventA {}),
595 Box::new(TestEventB {}),
596 Box::new(TestEventA {}),
597 Box::new(TestEventC {}),
598 Box::new(TestEventB {}),
599 Box::new(TestEventC {}),
600 Box::new(TestEventA {}),
601 ];
602
603 let event_groups = group_events_by_type(&events);
604 assert_eq!(event_groups.len(), 3);
605 assert_eq!(event_groups.get("A").unwrap().len(), 3);
606 assert_eq!(event_groups.get("B").unwrap().len(), 2);
607 assert_eq!(event_groups.get("C").unwrap().len(), 2);
608 }
609
610 #[test]
611 fn test_build_row_inserts_request() {
612 let events: Vec<Box<dyn Event>> = vec![
613 Box::new(TestEventA {}),
614 Box::new(TestEventB {}),
615 Box::new(TestEventA {}),
616 Box::new(TestEventC {}),
617 Box::new(TestEventB {}),
618 Box::new(TestEventC {}),
619 Box::new(TestEventA {}),
620 ];
621
622 let event_groups = group_events_by_type(&events);
623 assert_eq!(event_groups.len(), 3);
624 assert_eq!(event_groups.get("A").unwrap().len(), 3);
625 assert_eq!(event_groups.get("B").unwrap().len(), 2);
626 assert_eq!(event_groups.get("C").unwrap().len(), 2);
627
628 for (event_type, events) in event_groups {
629 let row_inserts_request = build_row_inserts_request(&events).unwrap();
630 if event_type == "A" {
631 assert_eq!(row_inserts_request.inserts.len(), 1);
632 assert_eq!(
633 row_inserts_request.inserts[0].table_name,
634 DEFAULT_EVENTS_TABLE_NAME
635 );
636 assert_eq!(
637 row_inserts_request.inserts[0]
638 .rows
639 .as_ref()
640 .unwrap()
641 .rows
642 .len(),
643 3
644 );
645 } else if event_type == "B" {
646 assert_eq!(row_inserts_request.inserts.len(), 1);
647 assert_eq!(row_inserts_request.inserts[0].table_name, "table_B");
648 assert_eq!(
649 row_inserts_request.inserts[0]
650 .rows
651 .as_ref()
652 .unwrap()
653 .rows
654 .len(),
655 2
656 );
657 } else if event_type == "C" {
658 assert_eq!(row_inserts_request.inserts.len(), 1);
659 assert_eq!(row_inserts_request.inserts[0].table_name, "table_C");
660 assert_eq!(
661 row_inserts_request.inserts[0]
662 .rows
663 .as_ref()
664 .unwrap()
665 .rows
666 .len(),
667 2
668 );
669 } else {
670 panic!("Unexpected event type: {}", event_type);
671 }
672 }
673 }
674}