mito2/wal/
entry_distributor.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use async_stream::stream;
19use common_telemetry::{debug, error, warn};
20use futures::future::join_all;
21use snafu::OptionExt;
22use store_api::logstore::entry::Entry;
23use store_api::logstore::provider::Provider;
24use store_api::storage::RegionId;
25use tokio::sync::mpsc::{self, Receiver, Sender};
26use tokio::sync::oneshot;
27use tokio_stream::StreamExt;
28
29use crate::error::{self, Result};
30use crate::wal::entry_reader::{decode_raw_entry, WalEntryReader};
31use crate::wal::raw_entry_reader::RawEntryReader;
32use crate::wal::{EntryId, WalEntryStream};
33
34/// [WalEntryDistributor] distributes Wal entries to specific [WalEntryReceiver]s based on [RegionId].
35pub(crate) struct WalEntryDistributor {
36    raw_wal_reader: Arc<dyn RawEntryReader>,
37    provider: Provider,
38    /// Sends [Entry] to receivers based on [RegionId]
39    senders: HashMap<RegionId, Sender<Entry>>,
40    /// Waits for the arg from the [WalEntryReader].
41    arg_receivers: Vec<(RegionId, oneshot::Receiver<EntryId>)>,
42}
43
44impl WalEntryDistributor {
45    /// Distributes entries to specific [WalEntryReceiver]s based on [RegionId].
46    pub async fn distribute(mut self) -> Result<()> {
47        let arg_futures = self
48            .arg_receivers
49            .iter_mut()
50            .map(|(region_id, receiver)| async { (*region_id, receiver.await.ok()) });
51        let args = join_all(arg_futures)
52            .await
53            .into_iter()
54            .filter_map(|(region_id, start_id)| start_id.map(|start_id| (region_id, start_id)))
55            .collect::<Vec<_>>();
56
57        // No subscribers
58        if args.is_empty() {
59            return Ok(());
60        }
61        // Safety: must exist
62        let min_start_id = args.iter().map(|(_, start_id)| *start_id).min().unwrap();
63        let receivers: HashMap<_, _> = args
64            .into_iter()
65            .map(|(region_id, start_id)| {
66                (
67                    region_id,
68                    EntryReceiver {
69                        start_id,
70                        sender: self.senders[&region_id].clone(),
71                    },
72                )
73            })
74            .collect();
75
76        let mut stream = self.raw_wal_reader.read(&self.provider, min_start_id)?;
77        while let Some(entry) = stream.next().await {
78            let entry = entry?;
79            let entry_id = entry.entry_id();
80            let region_id = entry.region_id();
81
82            if let Some(EntryReceiver { sender, start_id }) = receivers.get(&region_id) {
83                if entry_id >= *start_id {
84                    if let Err(err) = sender.send(entry).await {
85                        error!(err; "Failed to distribute raw entry, entry_id:{}, region_id: {}", entry_id, region_id);
86                    }
87                }
88            } else {
89                debug!("Subscriber not found, region_id: {}", region_id);
90            }
91        }
92
93        Ok(())
94    }
95}
96
97/// Receives the Wal entries from [WalEntryDistributor].
98#[derive(Debug)]
99pub(crate) struct WalEntryReceiver {
100    /// Receives the [Entry] from the [WalEntryDistributor].
101    entry_receiver: Option<Receiver<Entry>>,
102    /// Sends the `start_id` to the [WalEntryDistributor].
103    arg_sender: Option<oneshot::Sender<EntryId>>,
104}
105
106impl WalEntryReceiver {
107    pub fn new(entry_receiver: Receiver<Entry>, arg_sender: oneshot::Sender<EntryId>) -> Self {
108        Self {
109            entry_receiver: Some(entry_receiver),
110            arg_sender: Some(arg_sender),
111        }
112    }
113}
114
115impl WalEntryReader for WalEntryReceiver {
116    fn read(&mut self, _provider: &Provider, start_id: EntryId) -> Result<WalEntryStream<'static>> {
117        let arg_sender =
118            self.arg_sender
119                .take()
120                .with_context(|| error::InvalidWalReadRequestSnafu {
121                    reason: format!("Call WalEntryReceiver multiple time, start_id: {start_id}"),
122                })?;
123        // Safety: check via arg_sender
124        let mut entry_receiver = self.entry_receiver.take().unwrap();
125
126        if arg_sender.send(start_id).is_err() {
127            return error::InvalidWalReadRequestSnafu {
128                reason: format!(
129                    "WalEntryDistributor is dropped, failed to send arg, start_id: {start_id}"
130                ),
131            }
132            .fail();
133        }
134
135        let stream = stream! {
136            let mut buffered_entry: Option<Entry> = None;
137            while let Some(next_entry) = entry_receiver.recv().await {
138                match buffered_entry.take() {
139                    Some(entry) => {
140                        if entry.is_complete() {
141                            yield decode_raw_entry(entry);
142                        } else {
143                            warn!("Ignoring incomplete entry: {}", entry);
144                        }
145                        buffered_entry = Some(next_entry);
146                    },
147                    None => {
148                        buffered_entry = Some(next_entry);
149                    }
150                };
151            }
152            if let Some(entry) = buffered_entry {
153                // Ignores tail corrupted data.
154                if entry.is_complete() {
155                    yield decode_raw_entry(entry);
156                } else {
157                    warn!("Ignoring incomplete entry: {}", entry);
158                }
159            }
160        };
161
162        Ok(Box::pin(stream))
163    }
164}
165
166struct EntryReceiver {
167    start_id: EntryId,
168    sender: Sender<Entry>,
169}
170
171/// The default buffer size of the [Entry] receiver.
172pub const DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE: usize = 2048;
173
174/// Returns [WalEntryDistributor] and batch [WalEntryReceiver]s.
175///
176/// ### Note:
177/// Ensures `receiver.read` is called before the `distributor.distribute` in the same thread.
178///
179/// ```text
180/// let (distributor, receivers) = build_wal_entry_distributor_and_receivers(..);
181///  Thread 1                        |
182///                                  |
183/// // may deadlock                  |
184/// distributor.distribute().await;  |
185///                                  |  
186///                                  |
187/// receivers[0].read().await        |
188/// ```
189///
190pub fn build_wal_entry_distributor_and_receivers(
191    provider: Provider,
192    raw_wal_reader: Arc<dyn RawEntryReader>,
193    region_ids: &[RegionId],
194    buffer_size: usize,
195) -> (WalEntryDistributor, Vec<WalEntryReceiver>) {
196    let mut senders = HashMap::with_capacity(region_ids.len());
197    let mut readers = Vec::with_capacity(region_ids.len());
198    let mut arg_receivers = Vec::with_capacity(region_ids.len());
199
200    for &region_id in region_ids {
201        let (entry_sender, entry_receiver) = mpsc::channel(buffer_size);
202        let (arg_sender, arg_receiver) = oneshot::channel();
203
204        senders.insert(region_id, entry_sender);
205        arg_receivers.push((region_id, arg_receiver));
206        readers.push(WalEntryReceiver::new(entry_receiver, arg_sender));
207    }
208
209    (
210        WalEntryDistributor {
211            provider,
212            raw_wal_reader,
213            senders,
214            arg_receivers,
215        },
216        readers,
217    )
218}
219
220#[cfg(test)]
221mod tests {
222
223    use api::v1::{Mutation, OpType, WalEntry};
224    use futures::{stream, TryStreamExt};
225    use prost::Message;
226    use store_api::logstore::entry::{Entry, MultiplePartEntry, MultiplePartHeader, NaiveEntry};
227
228    use super::*;
229    use crate::test_util::wal_util::generate_tail_corrupted_stream;
230    use crate::wal::raw_entry_reader::{EntryStream, RawEntryReader};
231    use crate::wal::EntryId;
232
233    struct MockRawEntryReader {
234        entries: Vec<Entry>,
235    }
236
237    impl MockRawEntryReader {
238        pub fn new(entries: Vec<Entry>) -> MockRawEntryReader {
239            Self { entries }
240        }
241    }
242
243    impl RawEntryReader for MockRawEntryReader {
244        fn read(&self, _provider: &Provider, _start_id: EntryId) -> Result<EntryStream<'static>> {
245            let stream = stream::iter(self.entries.clone().into_iter().map(Ok));
246            Ok(Box::pin(stream))
247        }
248    }
249
250    #[tokio::test]
251    async fn test_wal_entry_distributor_without_receivers() {
252        let provider = Provider::kafka_provider("my_topic".to_string());
253        let reader = Arc::new(MockRawEntryReader::new(vec![Entry::Naive(NaiveEntry {
254            region_id: RegionId::new(1024, 1),
255            provider: provider.clone(),
256            entry_id: 1,
257            data: vec![1],
258        })]));
259
260        let (distributor, receivers) = build_wal_entry_distributor_and_receivers(
261            provider,
262            reader,
263            &[RegionId::new(1024, 1), RegionId::new(1025, 1)],
264            128,
265        );
266
267        // Drops all receivers
268        drop(receivers);
269        // Returns immediately
270        distributor.distribute().await.unwrap();
271    }
272
273    #[tokio::test]
274    async fn test_wal_entry_distributor() {
275        common_telemetry::init_default_ut_logging();
276        let provider = Provider::kafka_provider("my_topic".to_string());
277        let reader = Arc::new(MockRawEntryReader::new(vec![
278            Entry::Naive(NaiveEntry {
279                provider: provider.clone(),
280                region_id: RegionId::new(1024, 1),
281                entry_id: 1,
282                data: WalEntry {
283                    mutations: vec![Mutation {
284                        op_type: OpType::Put as i32,
285                        sequence: 1u64,
286                        rows: None,
287                        write_hint: None,
288                    }],
289                    bulk_entries: vec![],
290                }
291                .encode_to_vec(),
292            }),
293            Entry::Naive(NaiveEntry {
294                provider: provider.clone(),
295                region_id: RegionId::new(1024, 2),
296                entry_id: 2,
297                data: WalEntry {
298                    mutations: vec![Mutation {
299                        op_type: OpType::Put as i32,
300                        sequence: 2u64,
301                        rows: None,
302                        write_hint: None,
303                    }],
304                    bulk_entries: vec![],
305                }
306                .encode_to_vec(),
307            }),
308            Entry::Naive(NaiveEntry {
309                provider: provider.clone(),
310                region_id: RegionId::new(1024, 3),
311                entry_id: 3,
312                data: WalEntry {
313                    mutations: vec![Mutation {
314                        op_type: OpType::Put as i32,
315                        sequence: 3u64,
316                        rows: None,
317                        write_hint: None,
318                    }],
319                    bulk_entries: vec![],
320                }
321                .encode_to_vec(),
322            }),
323        ]));
324
325        // Builds distributor and receivers
326        let (distributor, mut receivers) = build_wal_entry_distributor_and_receivers(
327            provider.clone(),
328            reader,
329            &[
330                RegionId::new(1024, 1),
331                RegionId::new(1024, 2),
332                RegionId::new(1024, 3),
333            ],
334            128,
335        );
336        assert_eq!(receivers.len(), 3);
337
338        // Should be okay if one of receiver is dropped.
339        let last = receivers.pop().unwrap();
340        drop(last);
341
342        let mut streams = receivers
343            .iter_mut()
344            .map(|receiver| receiver.read(&provider, 0).unwrap())
345            .collect::<Vec<_>>();
346        distributor.distribute().await.unwrap();
347        let entries = streams
348            .get_mut(0)
349            .unwrap()
350            .try_collect::<Vec<_>>()
351            .await
352            .unwrap();
353        assert_eq!(
354            entries,
355            vec![(
356                1,
357                WalEntry {
358                    mutations: vec![Mutation {
359                        op_type: OpType::Put as i32,
360                        sequence: 1u64,
361                        rows: None,
362                        write_hint: None,
363                    }],
364                    bulk_entries: vec![],
365                }
366            )]
367        );
368        let entries = streams
369            .get_mut(1)
370            .unwrap()
371            .try_collect::<Vec<_>>()
372            .await
373            .unwrap();
374        assert_eq!(
375            entries,
376            vec![(
377                2,
378                WalEntry {
379                    mutations: vec![Mutation {
380                        op_type: OpType::Put as i32,
381                        sequence: 2u64,
382                        rows: None,
383                        write_hint: None,
384                    }],
385                    bulk_entries: vec![],
386                }
387            )]
388        );
389    }
390
391    #[tokio::test]
392    async fn test_tail_corrupted_stream() {
393        common_telemetry::init_default_ut_logging();
394        let mut entries = vec![];
395        let region1 = RegionId::new(1, 1);
396        let region1_expected_wal_entry = WalEntry {
397            mutations: vec![Mutation {
398                op_type: OpType::Put as i32,
399                sequence: 1u64,
400                rows: None,
401                write_hint: None,
402            }],
403            bulk_entries: vec![],
404        };
405        let region2 = RegionId::new(1, 2);
406        let region2_expected_wal_entry = WalEntry {
407            mutations: vec![Mutation {
408                op_type: OpType::Put as i32,
409                sequence: 3u64,
410                rows: None,
411                write_hint: None,
412            }],
413            bulk_entries: vec![],
414        };
415        let region3 = RegionId::new(1, 3);
416        let region3_expected_wal_entry = WalEntry {
417            mutations: vec![Mutation {
418                op_type: OpType::Put as i32,
419                sequence: 3u64,
420                rows: None,
421                write_hint: None,
422            }],
423            bulk_entries: vec![],
424        };
425        let provider = Provider::kafka_provider("my_topic".to_string());
426        entries.extend(generate_tail_corrupted_stream(
427            provider.clone(),
428            region1,
429            &region1_expected_wal_entry,
430            3,
431        ));
432        entries.extend(generate_tail_corrupted_stream(
433            provider.clone(),
434            region2,
435            &region2_expected_wal_entry,
436            2,
437        ));
438        entries.extend(generate_tail_corrupted_stream(
439            provider.clone(),
440            region3,
441            &region3_expected_wal_entry,
442            4,
443        ));
444
445        let corrupted_stream = MockRawEntryReader { entries };
446        // Builds distributor and receivers
447        let (distributor, mut receivers) = build_wal_entry_distributor_and_receivers(
448            provider.clone(),
449            Arc::new(corrupted_stream),
450            &[region1, region2, region3],
451            128,
452        );
453        assert_eq!(receivers.len(), 3);
454        let mut streams = receivers
455            .iter_mut()
456            .map(|receiver| receiver.read(&provider, 0).unwrap())
457            .collect::<Vec<_>>();
458        distributor.distribute().await.unwrap();
459
460        assert_eq!(
461            streams
462                .get_mut(0)
463                .unwrap()
464                .try_collect::<Vec<_>>()
465                .await
466                .unwrap(),
467            vec![(0, region1_expected_wal_entry)]
468        );
469
470        assert_eq!(
471            streams
472                .get_mut(1)
473                .unwrap()
474                .try_collect::<Vec<_>>()
475                .await
476                .unwrap(),
477            vec![(0, region2_expected_wal_entry)]
478        );
479
480        assert_eq!(
481            streams
482                .get_mut(2)
483                .unwrap()
484                .try_collect::<Vec<_>>()
485                .await
486                .unwrap(),
487            vec![(0, region3_expected_wal_entry)]
488        );
489    }
490
491    #[tokio::test]
492    async fn test_part_corrupted_stream() {
493        common_telemetry::init_default_ut_logging();
494        let mut entries = vec![];
495        let region1 = RegionId::new(1, 1);
496        let region1_expected_wal_entry = WalEntry {
497            mutations: vec![Mutation {
498                op_type: OpType::Put as i32,
499                sequence: 1u64,
500                rows: None,
501                write_hint: None,
502            }],
503            bulk_entries: vec![],
504        };
505        let region2 = RegionId::new(1, 2);
506        let provider = Provider::kafka_provider("my_topic".to_string());
507        entries.extend(generate_tail_corrupted_stream(
508            provider.clone(),
509            region1,
510            &region1_expected_wal_entry,
511            3,
512        ));
513        entries.extend(vec![
514            // The incomplete entry.
515            Entry::MultiplePart(MultiplePartEntry {
516                provider: provider.clone(),
517                region_id: region2,
518                entry_id: 0,
519                headers: vec![MultiplePartHeader::First],
520                parts: vec![vec![1; 100]],
521            }),
522            // The incomplete entry.
523            Entry::MultiplePart(MultiplePartEntry {
524                provider: provider.clone(),
525                region_id: region2,
526                entry_id: 0,
527                headers: vec![MultiplePartHeader::First],
528                parts: vec![vec![1; 100]],
529            }),
530        ]);
531
532        let corrupted_stream = MockRawEntryReader { entries };
533        // Builds distributor and receivers
534        let (distributor, mut receivers) = build_wal_entry_distributor_and_receivers(
535            provider.clone(),
536            Arc::new(corrupted_stream),
537            &[region1, region2],
538            128,
539        );
540        assert_eq!(receivers.len(), 2);
541        let mut streams = receivers
542            .iter_mut()
543            .map(|receiver| receiver.read(&provider, 0).unwrap())
544            .collect::<Vec<_>>();
545        distributor.distribute().await.unwrap();
546        assert_eq!(
547            streams
548                .get_mut(0)
549                .unwrap()
550                .try_collect::<Vec<_>>()
551                .await
552                .unwrap(),
553            vec![(0, region1_expected_wal_entry)]
554        );
555
556        assert_eq!(
557            streams
558                .get_mut(1)
559                .unwrap()
560                .try_collect::<Vec<_>>()
561                .await
562                .unwrap(),
563            vec![]
564        );
565    }
566
567    #[tokio::test]
568    async fn test_wal_entry_receiver_start_id() {
569        let provider = Provider::kafka_provider("my_topic".to_string());
570        let reader = Arc::new(MockRawEntryReader::new(vec![
571            Entry::Naive(NaiveEntry {
572                provider: provider.clone(),
573                region_id: RegionId::new(1024, 1),
574                entry_id: 1,
575                data: WalEntry {
576                    mutations: vec![Mutation {
577                        op_type: OpType::Put as i32,
578                        sequence: 1u64,
579                        rows: None,
580                        write_hint: None,
581                    }],
582                    bulk_entries: vec![],
583                }
584                .encode_to_vec(),
585            }),
586            Entry::Naive(NaiveEntry {
587                provider: provider.clone(),
588                region_id: RegionId::new(1024, 2),
589                entry_id: 2,
590                data: WalEntry {
591                    mutations: vec![Mutation {
592                        op_type: OpType::Put as i32,
593                        sequence: 2u64,
594                        rows: None,
595                        write_hint: None,
596                    }],
597                    bulk_entries: vec![],
598                }
599                .encode_to_vec(),
600            }),
601            Entry::Naive(NaiveEntry {
602                provider: provider.clone(),
603                region_id: RegionId::new(1024, 1),
604                entry_id: 3,
605                data: WalEntry {
606                    mutations: vec![Mutation {
607                        op_type: OpType::Put as i32,
608                        sequence: 3u64,
609                        rows: None,
610                        write_hint: None,
611                    }],
612                    bulk_entries: vec![],
613                }
614                .encode_to_vec(),
615            }),
616            Entry::Naive(NaiveEntry {
617                provider: provider.clone(),
618                region_id: RegionId::new(1024, 2),
619                entry_id: 4,
620                data: WalEntry {
621                    mutations: vec![Mutation {
622                        op_type: OpType::Put as i32,
623                        sequence: 4u64,
624                        rows: None,
625                        write_hint: None,
626                    }],
627                    bulk_entries: vec![],
628                }
629                .encode_to_vec(),
630            }),
631        ]));
632
633        // Builds distributor and receivers
634        let (distributor, mut receivers) = build_wal_entry_distributor_and_receivers(
635            provider.clone(),
636            reader,
637            &[RegionId::new(1024, 1), RegionId::new(1024, 2)],
638            128,
639        );
640        assert_eq!(receivers.len(), 2);
641        let mut streams = receivers
642            .iter_mut()
643            .map(|receiver| receiver.read(&provider, 4).unwrap())
644            .collect::<Vec<_>>();
645        distributor.distribute().await.unwrap();
646
647        assert_eq!(
648            streams
649                .get_mut(1)
650                .unwrap()
651                .try_collect::<Vec<_>>()
652                .await
653                .unwrap(),
654            vec![(
655                4,
656                WalEntry {
657                    mutations: vec![Mutation {
658                        op_type: OpType::Put as i32,
659                        sequence: 4u64,
660                        rows: None,
661                        write_hint: None,
662                    }],
663                    bulk_entries: vec![],
664                }
665            )]
666        );
667    }
668}