1use std::any::Any;
16use std::collections::HashMap;
17use std::fmt::Debug;
18use std::sync::Arc;
19use std::time::Duration;
20
21use api::v1::column_data_type_extension::TypeExt;
22use api::v1::value::ValueData;
23use api::v1::{
24 ColumnDataType, ColumnDataTypeExtension, ColumnSchema, JsonTypeExtension, Row,
25 RowInsertRequest, RowInsertRequests, Rows, SemanticType,
26};
27use async_trait::async_trait;
28use backon::{BackoffBuilder, ExponentialBuilder};
29use common_telemetry::{debug, error, info, warn};
30use common_time::timestamp::{TimeUnit, Timestamp};
31use humantime::format_duration;
32use itertools::Itertools;
33use serde::{Deserialize, Serialize};
34use store_api::mito_engine_options::{APPEND_MODE_KEY, TTL_KEY};
35use tokio::sync::mpsc::{channel, Receiver, Sender};
36use tokio::task::JoinHandle;
37use tokio::time::sleep;
38use tokio_util::sync::CancellationToken;
39
40use crate::error::{MismatchedSchemaSnafu, Result};
41
42pub const DEFAULT_EVENTS_TABLE_NAME: &str = "events";
44
45pub const EVENTS_TABLE_TYPE_COLUMN_NAME: &str = "type";
47pub const EVENTS_TABLE_PAYLOAD_COLUMN_NAME: &str = "payload";
49pub const EVENTS_TABLE_TIMESTAMP_COLUMN_NAME: &str = "timestamp";
51
52pub type EventRecorderRef = Arc<dyn EventRecorder>;
54
55pub const DEFAULT_FLUSH_INTERVAL_SECONDS: Duration = Duration::from_secs(5);
57const DEFAULT_EVENTS_TABLE_TTL: Duration = Duration::from_days(90);
59const DEFAULT_CHANNEL_SIZE: usize = 2048;
61const DEFAULT_BUFFER_SIZE: usize = 100;
63const DEFAULT_MAX_RETRY_TIMES: u64 = 3;
65
66pub trait Event: Send + Sync + Debug {
75 fn table_name(&self) -> &str {
77 DEFAULT_EVENTS_TABLE_NAME
78 }
79
80 fn event_type(&self) -> &str;
82
83 fn timestamp(&self) -> Timestamp {
85 Timestamp::current_time(TimeUnit::Nanosecond)
86 }
87
88 fn json_payload(&self) -> Result<String> {
90 Ok("".to_string())
91 }
92
93 fn extra_schema(&self) -> Vec<ColumnSchema> {
95 vec![]
96 }
97
98 fn extra_row(&self) -> Result<Row> {
100 Ok(Row { values: vec![] })
101 }
102
103 fn as_any(&self) -> &dyn Any;
105}
106
107pub trait Eventable: Send + Sync + Debug {
109 fn to_event(&self) -> Option<Box<dyn Event>> {
111 None
112 }
113}
114
115#[allow(clippy::borrowed_box)]
117pub fn group_events_by_type(events: &[Box<dyn Event>]) -> HashMap<&str, Vec<&Box<dyn Event>>> {
118 events
119 .iter()
120 .into_grouping_map_by(|event| event.event_type())
121 .collect()
122}
123
124#[allow(clippy::borrowed_box)]
126pub fn build_row_inserts_request(events: &[&Box<dyn Event>]) -> Result<RowInsertRequests> {
127 validate_events(events)?;
129
130 let event = &events[0];
132 let mut schema: Vec<ColumnSchema> = Vec::with_capacity(3 + event.extra_schema().len());
133 schema.extend(vec![
134 ColumnSchema {
135 column_name: EVENTS_TABLE_TYPE_COLUMN_NAME.to_string(),
136 datatype: ColumnDataType::String.into(),
137 semantic_type: SemanticType::Tag.into(),
138 ..Default::default()
139 },
140 ColumnSchema {
141 column_name: EVENTS_TABLE_PAYLOAD_COLUMN_NAME.to_string(),
142 datatype: ColumnDataType::Binary as i32,
143 semantic_type: SemanticType::Field as i32,
144 datatype_extension: Some(ColumnDataTypeExtension {
145 type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
146 }),
147 ..Default::default()
148 },
149 ColumnSchema {
150 column_name: EVENTS_TABLE_TIMESTAMP_COLUMN_NAME.to_string(),
151 datatype: ColumnDataType::TimestampNanosecond.into(),
152 semantic_type: SemanticType::Timestamp.into(),
153 ..Default::default()
154 },
155 ]);
156 schema.extend(event.extra_schema());
157
158 let mut rows: Vec<Row> = Vec::with_capacity(events.len());
159 for event in events {
160 let extra_row = event.extra_row()?;
161 let mut values = Vec::with_capacity(3 + extra_row.values.len());
162 values.extend([
163 ValueData::StringValue(event.event_type().to_string()).into(),
164 ValueData::BinaryValue(event.json_payload()?.into_bytes()).into(),
165 ValueData::TimestampNanosecondValue(event.timestamp().value()).into(),
166 ]);
167 values.extend(extra_row.values);
168 rows.push(Row { values });
169 }
170
171 Ok(RowInsertRequests {
172 inserts: vec![RowInsertRequest {
173 table_name: event.table_name().to_string(),
174 rows: Some(Rows { schema, rows }),
175 }],
176 })
177}
178
179#[allow(clippy::borrowed_box)]
181fn validate_events(events: &[&Box<dyn Event>]) -> Result<()> {
182 let extra_schema = events[0].extra_schema();
184 for event in events {
185 if event.extra_schema() != extra_schema {
186 MismatchedSchemaSnafu {
187 expected: extra_schema.clone(),
188 actual: event.extra_schema(),
189 }
190 .fail()?;
191 }
192 }
193 Ok(())
194}
195
196pub trait EventRecorder: Send + Sync + Debug + 'static {
198 fn record(&self, event: Box<dyn Event>);
200
201 fn close(&self);
203}
204
205#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
207pub struct EventHandlerOptions {
208 pub ttl: Duration,
210 pub append_mode: bool,
212}
213
214impl Default for EventHandlerOptions {
215 fn default() -> Self {
216 Self {
217 ttl: DEFAULT_EVENTS_TABLE_TTL,
218 append_mode: true,
219 }
220 }
221}
222
223impl EventHandlerOptions {
224 pub fn to_hints(&self) -> Vec<(&str, String)> {
226 vec![
227 (TTL_KEY, format_duration(self.ttl).to_string()),
228 (APPEND_MODE_KEY, self.append_mode.to_string()),
229 ]
230 }
231}
232
233#[async_trait]
235pub trait EventHandler: Send + Sync + 'static {
236 async fn handle(&self, events: &[Box<dyn Event>]) -> Result<()>;
239}
240
241#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
243pub struct EventRecorderOptions {
244 #[serde(with = "humantime_serde")]
246 pub ttl: Duration,
247}
248
249impl Default for EventRecorderOptions {
250 fn default() -> Self {
251 Self {
252 ttl: DEFAULT_EVENTS_TABLE_TTL,
253 }
254 }
255}
256
257#[derive(Debug)]
259pub struct EventRecorderImpl {
260 tx: Sender<Box<dyn Event>>,
262 cancel_token: CancellationToken,
264 handle: Option<JoinHandle<()>>,
266}
267
268impl EventRecorderImpl {
269 pub fn new(event_handler: Box<dyn EventHandler>) -> Self {
270 let (tx, rx) = channel(DEFAULT_CHANNEL_SIZE);
271 let cancel_token = CancellationToken::new();
272
273 let mut recorder = Self {
274 tx,
275 handle: None,
276 cancel_token: cancel_token.clone(),
277 };
278
279 let processor = EventProcessor::new(
280 rx,
281 event_handler,
282 DEFAULT_FLUSH_INTERVAL_SECONDS,
283 DEFAULT_MAX_RETRY_TIMES,
284 )
285 .with_cancel_token(cancel_token);
286
287 let handle = tokio::spawn(async move {
289 processor.process(DEFAULT_BUFFER_SIZE).await;
290 });
291
292 recorder.handle = Some(handle);
293
294 recorder
295 }
296}
297
298impl EventRecorder for EventRecorderImpl {
299 fn record(&self, event: Box<dyn Event>) {
301 if let Err(e) = self.tx.try_send(event) {
302 error!("Failed to send event to the background processor: {}", e);
303 }
304 }
305
306 fn close(&self) {
308 self.cancel_token.cancel();
309 }
310}
311
312impl Drop for EventRecorderImpl {
313 fn drop(&mut self) {
314 if let Some(handle) = self.handle.take() {
315 handle.abort();
316 info!("Aborted the background processor in event recorder");
317 }
318 }
319}
320
321struct EventProcessor {
322 rx: Receiver<Box<dyn Event>>,
323 event_handler: Box<dyn EventHandler>,
324 max_retry_times: u64,
325 process_interval: Duration,
326 cancel_token: CancellationToken,
327}
328
329impl EventProcessor {
330 fn new(
331 rx: Receiver<Box<dyn Event>>,
332 event_handler: Box<dyn EventHandler>,
333 process_interval: Duration,
334 max_retry_times: u64,
335 ) -> Self {
336 Self {
337 rx,
338 event_handler,
339 max_retry_times,
340 process_interval,
341 cancel_token: CancellationToken::new(),
342 }
343 }
344
345 fn with_cancel_token(mut self, cancel_token: CancellationToken) -> Self {
346 self.cancel_token = cancel_token;
347 self
348 }
349
350 async fn process(mut self, buffer_size: usize) {
351 info!("Start the background processor in event recorder to handle the received events.");
352
353 let mut buffer = Vec::with_capacity(buffer_size);
354 let mut interval = tokio::time::interval(self.process_interval);
355
356 loop {
357 tokio::select! {
358 maybe_event = self.rx.recv() => {
359 if let Some(maybe_event) = maybe_event {
360 debug!("Received event: {:?}", maybe_event);
361
362 if buffer.len() >= buffer_size {
363 debug!(
364 "Flushing events to the event handler because the buffer is full with {} events",
365 buffer.len()
366 );
367 self.flush_events_to_handler(&mut buffer).await;
368 }
369
370 buffer.push(maybe_event);
372 } else {
373 self.flush_events_to_handler(&mut buffer).await;
375 break;
376 }
377 }
378 _ = self.cancel_token.cancelled() => {
380 warn!("Received a cancel signal, flushing the buffer and exiting the loop");
381 self.flush_events_to_handler(&mut buffer).await;
382 break;
383 }
384 _ = interval.tick() => {
386 self.flush_events_to_handler(&mut buffer).await;
387 }
388 }
389 }
390 }
391
392 async fn flush_events_to_handler(&self, buffer: &mut Vec<Box<dyn Event>>) {
394 if !buffer.is_empty() {
395 debug!("Flushing {} events to the event handler", buffer.len());
396
397 let mut backoff = ExponentialBuilder::default()
398 .with_min_delay(Duration::from_millis(
399 DEFAULT_FLUSH_INTERVAL_SECONDS.as_millis() as u64 / self.max_retry_times.max(1),
400 ))
401 .with_max_delay(Duration::from_millis(
402 DEFAULT_FLUSH_INTERVAL_SECONDS.as_millis() as u64,
403 ))
404 .with_max_times(self.max_retry_times as usize)
405 .build();
406
407 loop {
408 match self.event_handler.handle(buffer).await {
409 Ok(()) => {
410 debug!("Successfully handled {} events", buffer.len());
411 break;
412 }
413 Err(e) => {
414 if let Some(d) = backoff.next() {
415 warn!(e; "Failed to handle events, retrying...");
416 sleep(d).await;
417 continue;
418 } else {
419 warn!(
420 e; "Failed to handle events after {} retries",
421 self.max_retry_times
422 );
423 break;
424 }
425 }
426 }
427 }
428 }
429
430 buffer.clear();
432 }
433}
434
435#[cfg(test)]
436mod tests {
437 use super::*;
438
439 #[derive(Debug)]
440 struct TestEvent {}
441
442 impl Event for TestEvent {
443 fn event_type(&self) -> &str {
444 "test_event"
445 }
446
447 fn json_payload(&self) -> Result<String> {
448 Ok("{\"procedure_id\": \"1234567890\"}".to_string())
449 }
450
451 fn as_any(&self) -> &dyn Any {
452 self
453 }
454 }
455
456 struct TestEventHandlerImpl {}
457
458 #[async_trait]
459 impl EventHandler for TestEventHandlerImpl {
460 async fn handle(&self, events: &[Box<dyn Event>]) -> Result<()> {
461 let event = events
462 .first()
463 .unwrap()
464 .as_any()
465 .downcast_ref::<TestEvent>()
466 .unwrap();
467 assert_eq!(
468 event.json_payload().unwrap(),
469 "{\"procedure_id\": \"1234567890\"}"
470 );
471 assert_eq!(event.event_type(), "test_event");
472 Ok(())
473 }
474 }
475
476 #[tokio::test]
477 async fn test_event_recorder() {
478 let mut event_recorder = EventRecorderImpl::new(Box::new(TestEventHandlerImpl {}));
479 event_recorder.record(Box::new(TestEvent {}));
480
481 sleep(Duration::from_millis(500)).await;
483
484 event_recorder.close();
486
487 sleep(Duration::from_millis(500)).await;
489
490 if let Some(handle) = event_recorder.handle.take() {
491 assert!(handle.await.is_ok());
492 }
493 }
494
495 struct TestEventHandlerImplShouldPanic {}
496
497 #[async_trait]
498 impl EventHandler for TestEventHandlerImplShouldPanic {
499 async fn handle(&self, events: &[Box<dyn Event>]) -> Result<()> {
500 let event = events
501 .first()
502 .unwrap()
503 .as_any()
504 .downcast_ref::<TestEvent>()
505 .unwrap();
506
507 assert_eq!(
509 event.json_payload().unwrap(),
510 "{\"procedure_id\": \"should_panic\"}"
511 );
512 assert_eq!(event.event_type(), "should_panic");
513 Ok(())
514 }
515 }
516
517 #[tokio::test]
518 async fn test_event_recorder_should_panic() {
519 let mut event_recorder =
520 EventRecorderImpl::new(Box::new(TestEventHandlerImplShouldPanic {}));
521
522 event_recorder.record(Box::new(TestEvent {}));
523
524 sleep(Duration::from_millis(500)).await;
526
527 event_recorder.close();
529
530 sleep(Duration::from_millis(500)).await;
532
533 if let Some(handle) = event_recorder.handle.take() {
534 assert!(handle.await.unwrap_err().is_panic());
535 }
536 }
537
538 #[derive(Debug)]
539 struct TestEventA {}
540
541 impl Event for TestEventA {
542 fn event_type(&self) -> &str {
543 "A"
544 }
545
546 fn as_any(&self) -> &dyn Any {
547 self
548 }
549 }
550
551 #[derive(Debug)]
552 struct TestEventB {}
553
554 impl Event for TestEventB {
555 fn table_name(&self) -> &str {
556 "table_B"
557 }
558
559 fn event_type(&self) -> &str {
560 "B"
561 }
562
563 fn as_any(&self) -> &dyn Any {
564 self
565 }
566 }
567
568 #[derive(Debug)]
569 struct TestEventC {}
570
571 impl Event for TestEventC {
572 fn table_name(&self) -> &str {
573 "table_C"
574 }
575
576 fn event_type(&self) -> &str {
577 "C"
578 }
579
580 fn as_any(&self) -> &dyn Any {
581 self
582 }
583 }
584
585 #[test]
586 fn test_group_events_by_type() {
587 let events: Vec<Box<dyn Event>> = vec![
588 Box::new(TestEventA {}),
589 Box::new(TestEventB {}),
590 Box::new(TestEventA {}),
591 Box::new(TestEventC {}),
592 Box::new(TestEventB {}),
593 Box::new(TestEventC {}),
594 Box::new(TestEventA {}),
595 ];
596
597 let event_groups = group_events_by_type(&events);
598 assert_eq!(event_groups.len(), 3);
599 assert_eq!(event_groups.get("A").unwrap().len(), 3);
600 assert_eq!(event_groups.get("B").unwrap().len(), 2);
601 assert_eq!(event_groups.get("C").unwrap().len(), 2);
602 }
603
604 #[test]
605 fn test_build_row_inserts_request() {
606 let events: Vec<Box<dyn Event>> = vec![
607 Box::new(TestEventA {}),
608 Box::new(TestEventB {}),
609 Box::new(TestEventA {}),
610 Box::new(TestEventC {}),
611 Box::new(TestEventB {}),
612 Box::new(TestEventC {}),
613 Box::new(TestEventA {}),
614 ];
615
616 let event_groups = group_events_by_type(&events);
617 assert_eq!(event_groups.len(), 3);
618 assert_eq!(event_groups.get("A").unwrap().len(), 3);
619 assert_eq!(event_groups.get("B").unwrap().len(), 2);
620 assert_eq!(event_groups.get("C").unwrap().len(), 2);
621
622 for (event_type, events) in event_groups {
623 let row_inserts_request = build_row_inserts_request(&events).unwrap();
624 if event_type == "A" {
625 assert_eq!(row_inserts_request.inserts.len(), 1);
626 assert_eq!(
627 row_inserts_request.inserts[0].table_name,
628 DEFAULT_EVENTS_TABLE_NAME
629 );
630 assert_eq!(
631 row_inserts_request.inserts[0]
632 .rows
633 .as_ref()
634 .unwrap()
635 .rows
636 .len(),
637 3
638 );
639 } else if event_type == "B" {
640 assert_eq!(row_inserts_request.inserts.len(), 1);
641 assert_eq!(row_inserts_request.inserts[0].table_name, "table_B");
642 assert_eq!(
643 row_inserts_request.inserts[0]
644 .rows
645 .as_ref()
646 .unwrap()
647 .rows
648 .len(),
649 2
650 );
651 } else if event_type == "C" {
652 assert_eq!(row_inserts_request.inserts.len(), 1);
653 assert_eq!(row_inserts_request.inserts[0].table_name, "table_C");
654 assert_eq!(
655 row_inserts_request.inserts[0]
656 .rows
657 .as_ref()
658 .unwrap()
659 .rows
660 .len(),
661 2
662 );
663 } else {
664 panic!("Unexpected event type: {}", event_type);
665 }
666 }
667 }
668}