1use std::collections::HashMap;
16use std::sync::Arc;
17
18use async_stream::stream;
19use common_telemetry::{debug, error, warn};
20use futures::future::join_all;
21use snafu::OptionExt;
22use store_api::logstore::entry::Entry;
23use store_api::logstore::provider::Provider;
24use store_api::storage::RegionId;
25use tokio::sync::mpsc::{self, Receiver, Sender};
26use tokio::sync::oneshot;
27use tokio_stream::StreamExt;
28
29use crate::error::{self, Result};
30use crate::wal::entry_reader::{decode_raw_entry, WalEntryReader};
31use crate::wal::raw_entry_reader::RawEntryReader;
32use crate::wal::{EntryId, WalEntryStream};
33
34pub(crate) struct WalEntryDistributor {
36 raw_wal_reader: Arc<dyn RawEntryReader>,
37 provider: Provider,
38 senders: HashMap<RegionId, Sender<Entry>>,
40 arg_receivers: Vec<(RegionId, oneshot::Receiver<EntryId>)>,
42}
43
44impl WalEntryDistributor {
45 pub async fn distribute(mut self) -> Result<()> {
47 let arg_futures = self
48 .arg_receivers
49 .iter_mut()
50 .map(|(region_id, receiver)| async { (*region_id, receiver.await.ok()) });
51 let args = join_all(arg_futures)
52 .await
53 .into_iter()
54 .filter_map(|(region_id, start_id)| start_id.map(|start_id| (region_id, start_id)))
55 .collect::<Vec<_>>();
56
57 if args.is_empty() {
59 return Ok(());
60 }
61 let min_start_id = args.iter().map(|(_, start_id)| *start_id).min().unwrap();
63 let receivers: HashMap<_, _> = args
64 .into_iter()
65 .map(|(region_id, start_id)| {
66 (
67 region_id,
68 EntryReceiver {
69 start_id,
70 sender: self.senders[®ion_id].clone(),
71 },
72 )
73 })
74 .collect();
75
76 let mut stream = self.raw_wal_reader.read(&self.provider, min_start_id)?;
77 while let Some(entry) = stream.next().await {
78 let entry = entry?;
79 let entry_id = entry.entry_id();
80 let region_id = entry.region_id();
81
82 if let Some(EntryReceiver { sender, start_id }) = receivers.get(®ion_id) {
83 if entry_id >= *start_id {
84 if let Err(err) = sender.send(entry).await {
85 error!(err; "Failed to distribute raw entry, entry_id:{}, region_id: {}", entry_id, region_id);
86 }
87 }
88 } else {
89 debug!("Subscriber not found, region_id: {}", region_id);
90 }
91 }
92
93 Ok(())
94 }
95}
96
97#[derive(Debug)]
99pub(crate) struct WalEntryReceiver {
100 entry_receiver: Option<Receiver<Entry>>,
102 arg_sender: Option<oneshot::Sender<EntryId>>,
104}
105
106impl WalEntryReceiver {
107 pub fn new(entry_receiver: Receiver<Entry>, arg_sender: oneshot::Sender<EntryId>) -> Self {
108 Self {
109 entry_receiver: Some(entry_receiver),
110 arg_sender: Some(arg_sender),
111 }
112 }
113}
114
115impl WalEntryReader for WalEntryReceiver {
116 fn read(&mut self, _provider: &Provider, start_id: EntryId) -> Result<WalEntryStream<'static>> {
117 let arg_sender =
118 self.arg_sender
119 .take()
120 .with_context(|| error::InvalidWalReadRequestSnafu {
121 reason: format!("Call WalEntryReceiver multiple time, start_id: {start_id}"),
122 })?;
123 let mut entry_receiver = self.entry_receiver.take().unwrap();
125
126 if arg_sender.send(start_id).is_err() {
127 return error::InvalidWalReadRequestSnafu {
128 reason: format!(
129 "WalEntryDistributor is dropped, failed to send arg, start_id: {start_id}"
130 ),
131 }
132 .fail();
133 }
134
135 let stream = stream! {
136 let mut buffered_entry: Option<Entry> = None;
137 while let Some(next_entry) = entry_receiver.recv().await {
138 match buffered_entry.take() {
139 Some(entry) => {
140 if entry.is_complete() {
141 yield decode_raw_entry(entry);
142 } else {
143 warn!("Ignoring incomplete entry: {}", entry);
144 }
145 buffered_entry = Some(next_entry);
146 },
147 None => {
148 buffered_entry = Some(next_entry);
149 }
150 };
151 }
152 if let Some(entry) = buffered_entry {
153 if entry.is_complete() {
155 yield decode_raw_entry(entry);
156 } else {
157 warn!("Ignoring incomplete entry: {}", entry);
158 }
159 }
160 };
161
162 Ok(Box::pin(stream))
163 }
164}
165
166struct EntryReceiver {
167 start_id: EntryId,
168 sender: Sender<Entry>,
169}
170
171pub const DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE: usize = 2048;
173
174pub fn build_wal_entry_distributor_and_receivers(
191 provider: Provider,
192 raw_wal_reader: Arc<dyn RawEntryReader>,
193 region_ids: &[RegionId],
194 buffer_size: usize,
195) -> (WalEntryDistributor, Vec<WalEntryReceiver>) {
196 let mut senders = HashMap::with_capacity(region_ids.len());
197 let mut readers = Vec::with_capacity(region_ids.len());
198 let mut arg_receivers = Vec::with_capacity(region_ids.len());
199
200 for ®ion_id in region_ids {
201 let (entry_sender, entry_receiver) = mpsc::channel(buffer_size);
202 let (arg_sender, arg_receiver) = oneshot::channel();
203
204 senders.insert(region_id, entry_sender);
205 arg_receivers.push((region_id, arg_receiver));
206 readers.push(WalEntryReceiver::new(entry_receiver, arg_sender));
207 }
208
209 (
210 WalEntryDistributor {
211 provider,
212 raw_wal_reader,
213 senders,
214 arg_receivers,
215 },
216 readers,
217 )
218}
219
220#[cfg(test)]
221mod tests {
222
223 use api::v1::{Mutation, OpType, WalEntry};
224 use futures::{stream, TryStreamExt};
225 use prost::Message;
226 use store_api::logstore::entry::{Entry, MultiplePartEntry, MultiplePartHeader, NaiveEntry};
227
228 use super::*;
229 use crate::test_util::wal_util::generate_tail_corrupted_stream;
230 use crate::wal::raw_entry_reader::{EntryStream, RawEntryReader};
231 use crate::wal::EntryId;
232
233 struct MockRawEntryReader {
234 entries: Vec<Entry>,
235 }
236
237 impl MockRawEntryReader {
238 pub fn new(entries: Vec<Entry>) -> MockRawEntryReader {
239 Self { entries }
240 }
241 }
242
243 impl RawEntryReader for MockRawEntryReader {
244 fn read(&self, _provider: &Provider, _start_id: EntryId) -> Result<EntryStream<'static>> {
245 let stream = stream::iter(self.entries.clone().into_iter().map(Ok));
246 Ok(Box::pin(stream))
247 }
248 }
249
250 #[tokio::test]
251 async fn test_wal_entry_distributor_without_receivers() {
252 let provider = Provider::kafka_provider("my_topic".to_string());
253 let reader = Arc::new(MockRawEntryReader::new(vec![Entry::Naive(NaiveEntry {
254 region_id: RegionId::new(1024, 1),
255 provider: provider.clone(),
256 entry_id: 1,
257 data: vec![1],
258 })]));
259
260 let (distributor, receivers) = build_wal_entry_distributor_and_receivers(
261 provider,
262 reader,
263 &[RegionId::new(1024, 1), RegionId::new(1025, 1)],
264 128,
265 );
266
267 drop(receivers);
269 distributor.distribute().await.unwrap();
271 }
272
273 #[tokio::test]
274 async fn test_wal_entry_distributor() {
275 common_telemetry::init_default_ut_logging();
276 let provider = Provider::kafka_provider("my_topic".to_string());
277 let reader = Arc::new(MockRawEntryReader::new(vec![
278 Entry::Naive(NaiveEntry {
279 provider: provider.clone(),
280 region_id: RegionId::new(1024, 1),
281 entry_id: 1,
282 data: WalEntry {
283 mutations: vec![Mutation {
284 op_type: OpType::Put as i32,
285 sequence: 1u64,
286 rows: None,
287 write_hint: None,
288 }],
289 bulk_entries: vec![],
290 }
291 .encode_to_vec(),
292 }),
293 Entry::Naive(NaiveEntry {
294 provider: provider.clone(),
295 region_id: RegionId::new(1024, 2),
296 entry_id: 2,
297 data: WalEntry {
298 mutations: vec![Mutation {
299 op_type: OpType::Put as i32,
300 sequence: 2u64,
301 rows: None,
302 write_hint: None,
303 }],
304 bulk_entries: vec![],
305 }
306 .encode_to_vec(),
307 }),
308 Entry::Naive(NaiveEntry {
309 provider: provider.clone(),
310 region_id: RegionId::new(1024, 3),
311 entry_id: 3,
312 data: WalEntry {
313 mutations: vec![Mutation {
314 op_type: OpType::Put as i32,
315 sequence: 3u64,
316 rows: None,
317 write_hint: None,
318 }],
319 bulk_entries: vec![],
320 }
321 .encode_to_vec(),
322 }),
323 ]));
324
325 let (distributor, mut receivers) = build_wal_entry_distributor_and_receivers(
327 provider.clone(),
328 reader,
329 &[
330 RegionId::new(1024, 1),
331 RegionId::new(1024, 2),
332 RegionId::new(1024, 3),
333 ],
334 128,
335 );
336 assert_eq!(receivers.len(), 3);
337
338 let last = receivers.pop().unwrap();
340 drop(last);
341
342 let mut streams = receivers
343 .iter_mut()
344 .map(|receiver| receiver.read(&provider, 0).unwrap())
345 .collect::<Vec<_>>();
346 distributor.distribute().await.unwrap();
347 let entries = streams
348 .get_mut(0)
349 .unwrap()
350 .try_collect::<Vec<_>>()
351 .await
352 .unwrap();
353 assert_eq!(
354 entries,
355 vec![(
356 1,
357 WalEntry {
358 mutations: vec![Mutation {
359 op_type: OpType::Put as i32,
360 sequence: 1u64,
361 rows: None,
362 write_hint: None,
363 }],
364 bulk_entries: vec![],
365 }
366 )]
367 );
368 let entries = streams
369 .get_mut(1)
370 .unwrap()
371 .try_collect::<Vec<_>>()
372 .await
373 .unwrap();
374 assert_eq!(
375 entries,
376 vec![(
377 2,
378 WalEntry {
379 mutations: vec![Mutation {
380 op_type: OpType::Put as i32,
381 sequence: 2u64,
382 rows: None,
383 write_hint: None,
384 }],
385 bulk_entries: vec![],
386 }
387 )]
388 );
389 }
390
391 #[tokio::test]
392 async fn test_tail_corrupted_stream() {
393 common_telemetry::init_default_ut_logging();
394 let mut entries = vec![];
395 let region1 = RegionId::new(1, 1);
396 let region1_expected_wal_entry = WalEntry {
397 mutations: vec![Mutation {
398 op_type: OpType::Put as i32,
399 sequence: 1u64,
400 rows: None,
401 write_hint: None,
402 }],
403 bulk_entries: vec![],
404 };
405 let region2 = RegionId::new(1, 2);
406 let region2_expected_wal_entry = WalEntry {
407 mutations: vec![Mutation {
408 op_type: OpType::Put as i32,
409 sequence: 3u64,
410 rows: None,
411 write_hint: None,
412 }],
413 bulk_entries: vec![],
414 };
415 let region3 = RegionId::new(1, 3);
416 let region3_expected_wal_entry = WalEntry {
417 mutations: vec![Mutation {
418 op_type: OpType::Put as i32,
419 sequence: 3u64,
420 rows: None,
421 write_hint: None,
422 }],
423 bulk_entries: vec![],
424 };
425 let provider = Provider::kafka_provider("my_topic".to_string());
426 entries.extend(generate_tail_corrupted_stream(
427 provider.clone(),
428 region1,
429 ®ion1_expected_wal_entry,
430 3,
431 ));
432 entries.extend(generate_tail_corrupted_stream(
433 provider.clone(),
434 region2,
435 ®ion2_expected_wal_entry,
436 2,
437 ));
438 entries.extend(generate_tail_corrupted_stream(
439 provider.clone(),
440 region3,
441 ®ion3_expected_wal_entry,
442 4,
443 ));
444
445 let corrupted_stream = MockRawEntryReader { entries };
446 let (distributor, mut receivers) = build_wal_entry_distributor_and_receivers(
448 provider.clone(),
449 Arc::new(corrupted_stream),
450 &[region1, region2, region3],
451 128,
452 );
453 assert_eq!(receivers.len(), 3);
454 let mut streams = receivers
455 .iter_mut()
456 .map(|receiver| receiver.read(&provider, 0).unwrap())
457 .collect::<Vec<_>>();
458 distributor.distribute().await.unwrap();
459
460 assert_eq!(
461 streams
462 .get_mut(0)
463 .unwrap()
464 .try_collect::<Vec<_>>()
465 .await
466 .unwrap(),
467 vec![(0, region1_expected_wal_entry)]
468 );
469
470 assert_eq!(
471 streams
472 .get_mut(1)
473 .unwrap()
474 .try_collect::<Vec<_>>()
475 .await
476 .unwrap(),
477 vec![(0, region2_expected_wal_entry)]
478 );
479
480 assert_eq!(
481 streams
482 .get_mut(2)
483 .unwrap()
484 .try_collect::<Vec<_>>()
485 .await
486 .unwrap(),
487 vec![(0, region3_expected_wal_entry)]
488 );
489 }
490
491 #[tokio::test]
492 async fn test_part_corrupted_stream() {
493 common_telemetry::init_default_ut_logging();
494 let mut entries = vec![];
495 let region1 = RegionId::new(1, 1);
496 let region1_expected_wal_entry = WalEntry {
497 mutations: vec![Mutation {
498 op_type: OpType::Put as i32,
499 sequence: 1u64,
500 rows: None,
501 write_hint: None,
502 }],
503 bulk_entries: vec![],
504 };
505 let region2 = RegionId::new(1, 2);
506 let provider = Provider::kafka_provider("my_topic".to_string());
507 entries.extend(generate_tail_corrupted_stream(
508 provider.clone(),
509 region1,
510 ®ion1_expected_wal_entry,
511 3,
512 ));
513 entries.extend(vec![
514 Entry::MultiplePart(MultiplePartEntry {
516 provider: provider.clone(),
517 region_id: region2,
518 entry_id: 0,
519 headers: vec![MultiplePartHeader::First],
520 parts: vec![vec![1; 100]],
521 }),
522 Entry::MultiplePart(MultiplePartEntry {
524 provider: provider.clone(),
525 region_id: region2,
526 entry_id: 0,
527 headers: vec![MultiplePartHeader::First],
528 parts: vec![vec![1; 100]],
529 }),
530 ]);
531
532 let corrupted_stream = MockRawEntryReader { entries };
533 let (distributor, mut receivers) = build_wal_entry_distributor_and_receivers(
535 provider.clone(),
536 Arc::new(corrupted_stream),
537 &[region1, region2],
538 128,
539 );
540 assert_eq!(receivers.len(), 2);
541 let mut streams = receivers
542 .iter_mut()
543 .map(|receiver| receiver.read(&provider, 0).unwrap())
544 .collect::<Vec<_>>();
545 distributor.distribute().await.unwrap();
546 assert_eq!(
547 streams
548 .get_mut(0)
549 .unwrap()
550 .try_collect::<Vec<_>>()
551 .await
552 .unwrap(),
553 vec![(0, region1_expected_wal_entry)]
554 );
555
556 assert_eq!(
557 streams
558 .get_mut(1)
559 .unwrap()
560 .try_collect::<Vec<_>>()
561 .await
562 .unwrap(),
563 vec![]
564 );
565 }
566
567 #[tokio::test]
568 async fn test_wal_entry_receiver_start_id() {
569 let provider = Provider::kafka_provider("my_topic".to_string());
570 let reader = Arc::new(MockRawEntryReader::new(vec![
571 Entry::Naive(NaiveEntry {
572 provider: provider.clone(),
573 region_id: RegionId::new(1024, 1),
574 entry_id: 1,
575 data: WalEntry {
576 mutations: vec![Mutation {
577 op_type: OpType::Put as i32,
578 sequence: 1u64,
579 rows: None,
580 write_hint: None,
581 }],
582 bulk_entries: vec![],
583 }
584 .encode_to_vec(),
585 }),
586 Entry::Naive(NaiveEntry {
587 provider: provider.clone(),
588 region_id: RegionId::new(1024, 2),
589 entry_id: 2,
590 data: WalEntry {
591 mutations: vec![Mutation {
592 op_type: OpType::Put as i32,
593 sequence: 2u64,
594 rows: None,
595 write_hint: None,
596 }],
597 bulk_entries: vec![],
598 }
599 .encode_to_vec(),
600 }),
601 Entry::Naive(NaiveEntry {
602 provider: provider.clone(),
603 region_id: RegionId::new(1024, 1),
604 entry_id: 3,
605 data: WalEntry {
606 mutations: vec![Mutation {
607 op_type: OpType::Put as i32,
608 sequence: 3u64,
609 rows: None,
610 write_hint: None,
611 }],
612 bulk_entries: vec![],
613 }
614 .encode_to_vec(),
615 }),
616 Entry::Naive(NaiveEntry {
617 provider: provider.clone(),
618 region_id: RegionId::new(1024, 2),
619 entry_id: 4,
620 data: WalEntry {
621 mutations: vec![Mutation {
622 op_type: OpType::Put as i32,
623 sequence: 4u64,
624 rows: None,
625 write_hint: None,
626 }],
627 bulk_entries: vec![],
628 }
629 .encode_to_vec(),
630 }),
631 ]));
632
633 let (distributor, mut receivers) = build_wal_entry_distributor_and_receivers(
635 provider.clone(),
636 reader,
637 &[RegionId::new(1024, 1), RegionId::new(1024, 2)],
638 128,
639 );
640 assert_eq!(receivers.len(), 2);
641 let mut streams = receivers
642 .iter_mut()
643 .map(|receiver| receiver.read(&provider, 4).unwrap())
644 .collect::<Vec<_>>();
645 distributor.distribute().await.unwrap();
646
647 assert_eq!(
648 streams
649 .get_mut(1)
650 .unwrap()
651 .try_collect::<Vec<_>>()
652 .await
653 .unwrap(),
654 vec![(
655 4,
656 WalEntry {
657 mutations: vec![Mutation {
658 op_type: OpType::Put as i32,
659 sequence: 4u64,
660 rows: None,
661 write_hint: None,
662 }],
663 bulk_entries: vec![],
664 }
665 )]
666 );
667 }
668}