mito2/
wal.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Write ahead log of the engine.
16
17pub(crate) mod entry_distributor;
18pub(crate) mod entry_reader;
19pub(crate) mod raw_entry_reader;
20
21use std::collections::HashMap;
22use std::mem;
23use std::sync::Arc;
24
25use api::v1::WalEntry;
26use common_error::ext::BoxedError;
27use common_telemetry::debug;
28use entry_reader::NoopEntryReader;
29use futures::future::BoxFuture;
30use futures::stream::BoxStream;
31use prost::Message;
32use snafu::ResultExt;
33use store_api::logstore::entry::Entry;
34use store_api::logstore::provider::Provider;
35use store_api::logstore::{AppendBatchResponse, LogStore, WalIndex};
36use store_api::storage::RegionId;
37
38use crate::error::{BuildEntrySnafu, DeleteWalSnafu, EncodeWalSnafu, Result, WriteWalSnafu};
39use crate::wal::entry_reader::{LogStoreEntryReader, WalEntryReader};
40use crate::wal::raw_entry_reader::{LogStoreRawEntryReader, RegionRawEntryReader};
41
42/// WAL entry id.
43pub type EntryId = store_api::logstore::entry::Id;
44/// A stream that yields tuple of WAL entry id and corresponding entry.
45pub type WalEntryStream<'a> = BoxStream<'a, Result<(EntryId, WalEntry)>>;
46
47/// Write ahead log.
48///
49/// All regions in the engine shares the same WAL instance.
50#[derive(Debug)]
51pub struct Wal<S> {
52    /// The underlying log store.
53    store: Arc<S>,
54}
55
56impl<S> Wal<S> {
57    /// Creates a new [Wal] from the log store.
58    pub fn new(store: Arc<S>) -> Self {
59        Self { store }
60    }
61
62    pub fn store(&self) -> &Arc<S> {
63        &self.store
64    }
65}
66
67impl<S> Clone for Wal<S> {
68    fn clone(&self) -> Self {
69        Self {
70            store: self.store.clone(),
71        }
72    }
73}
74
75impl<S: LogStore> Wal<S> {
76    /// Returns a writer to write to the WAL.
77    pub fn writer(&self) -> WalWriter<S> {
78        WalWriter {
79            store: self.store.clone(),
80            entries: Vec::new(),
81            entry_encode_buf: Vec::new(),
82            providers: HashMap::new(),
83        }
84    }
85
86    /// Returns a [OnRegionOpened] function.
87    pub(crate) fn on_region_opened(
88        &self,
89    ) -> impl FnOnce(RegionId, EntryId, &Provider) -> BoxFuture<Result<()>> {
90        let store = self.store.clone();
91        move |region_id, last_entry_id, provider| -> BoxFuture<'_, Result<()>> {
92            if let Provider::Noop = provider {
93                debug!("Skip obsolete for region: {}", region_id);
94                return Box::pin(async move { Ok(()) });
95            }
96            Box::pin(async move {
97                store
98                    .obsolete(provider, region_id, last_entry_id)
99                    .await
100                    .map_err(BoxedError::new)
101                    .context(DeleteWalSnafu { region_id })
102            })
103        }
104    }
105
106    /// Returns a [WalEntryReader]
107    pub(crate) fn wal_entry_reader(
108        &self,
109        provider: &Provider,
110        region_id: RegionId,
111        location_id: Option<u64>,
112    ) -> Box<dyn WalEntryReader> {
113        match provider {
114            Provider::RaftEngine(_) => Box::new(LogStoreEntryReader::new(
115                LogStoreRawEntryReader::new(self.store.clone()),
116            )),
117            Provider::Kafka(_) => {
118                let reader = if let Some(location_id) = location_id {
119                    LogStoreRawEntryReader::new(self.store.clone())
120                        .with_wal_index(WalIndex::new(region_id, location_id))
121                } else {
122                    LogStoreRawEntryReader::new(self.store.clone())
123                };
124
125                Box::new(LogStoreEntryReader::new(RegionRawEntryReader::new(
126                    reader, region_id,
127                )))
128            }
129            Provider::Noop => Box::new(NoopEntryReader),
130        }
131    }
132
133    /// Scan entries of specific region starting from `start_id` (inclusive).
134    /// Currently only used in tests.
135    pub fn scan<'a>(
136        &'a self,
137        region_id: RegionId,
138        start_id: EntryId,
139        provider: &'a Provider,
140    ) -> Result<WalEntryStream<'a>> {
141        match provider {
142            Provider::RaftEngine(_) => {
143                LogStoreEntryReader::new(LogStoreRawEntryReader::new(self.store.clone()))
144                    .read(provider, start_id)
145            }
146            Provider::Kafka(_) => LogStoreEntryReader::new(RegionRawEntryReader::new(
147                LogStoreRawEntryReader::new(self.store.clone()),
148                region_id,
149            ))
150            .read(provider, start_id),
151            Provider::Noop => Ok(Box::pin(futures::stream::empty())),
152        }
153    }
154
155    /// Mark entries whose ids `<= last_id` as deleted.
156    pub async fn obsolete(
157        &self,
158        region_id: RegionId,
159        last_id: EntryId,
160        provider: &Provider,
161    ) -> Result<()> {
162        if let Provider::Noop = provider {
163            return Ok(());
164        }
165        self.store
166            .obsolete(provider, region_id, last_id)
167            .await
168            .map_err(BoxedError::new)
169            .context(DeleteWalSnafu { region_id })
170    }
171}
172
173/// WAL batch writer.
174pub struct WalWriter<S: LogStore> {
175    /// Log store of the WAL.
176    store: Arc<S>,
177    /// Entries to write.
178    entries: Vec<Entry>,
179    /// Buffer to encode WAL entry.
180    entry_encode_buf: Vec<u8>,
181    /// Providers of regions being written into.
182    providers: HashMap<RegionId, Provider>,
183}
184
185impl<S: LogStore> WalWriter<S> {
186    /// Add a wal entry for specific region to the writer's buffer.
187    pub fn add_entry(
188        &mut self,
189        region_id: RegionId,
190        entry_id: EntryId,
191        wal_entry: &WalEntry,
192        provider: &Provider,
193    ) -> Result<()> {
194        // Gets or inserts with a newly built provider.
195        let provider = self
196            .providers
197            .entry(region_id)
198            .or_insert_with(|| provider.clone());
199
200        // Encode wal entry to log store entry.
201        self.entry_encode_buf.clear();
202        wal_entry
203            .encode(&mut self.entry_encode_buf)
204            .context(EncodeWalSnafu { region_id })?;
205        let entry = self
206            .store
207            .entry(&mut self.entry_encode_buf, entry_id, region_id, provider)
208            .map_err(BoxedError::new)
209            .context(BuildEntrySnafu { region_id })?;
210
211        self.entries.push(entry);
212
213        Ok(())
214    }
215
216    /// Write all buffered entries to the WAL.
217    pub async fn write_to_wal(&mut self) -> Result<AppendBatchResponse> {
218        // TODO(yingwen): metrics.
219
220        let entries = mem::take(&mut self.entries);
221        self.store
222            .append_batch(entries)
223            .await
224            .map_err(BoxedError::new)
225            .context(WriteWalSnafu)
226    }
227}
228
229#[cfg(test)]
230mod tests {
231    use api::v1::{
232        value, ColumnDataType, ColumnSchema, Mutation, OpType, Row, Rows, SemanticType, Value,
233    };
234    use common_test_util::temp_dir::{create_temp_dir, TempDir};
235    use futures::TryStreamExt;
236    use log_store::raft_engine::log_store::RaftEngineLogStore;
237    use log_store::test_util::log_store_util;
238    use store_api::storage::SequenceNumber;
239
240    use super::*;
241
242    struct WalEnv {
243        _wal_dir: TempDir,
244        log_store: Option<Arc<RaftEngineLogStore>>,
245    }
246
247    impl WalEnv {
248        async fn new() -> WalEnv {
249            let wal_dir = create_temp_dir("");
250            let log_store =
251                log_store_util::create_tmp_local_file_log_store(wal_dir.path().to_str().unwrap())
252                    .await;
253            WalEnv {
254                _wal_dir: wal_dir,
255                log_store: Some(Arc::new(log_store)),
256            }
257        }
258
259        fn new_wal(&self) -> Wal<RaftEngineLogStore> {
260            let log_store = self.log_store.clone().unwrap();
261            Wal::new(log_store)
262        }
263    }
264
265    /// Create a new mutation from rows.
266    ///
267    /// The row format is (string, i64).
268    fn new_mutation(op_type: OpType, sequence: SequenceNumber, rows: &[(&str, i64)]) -> Mutation {
269        let rows = rows
270            .iter()
271            .map(|(str_col, int_col)| {
272                let values = vec![
273                    Value {
274                        value_data: Some(value::ValueData::StringValue(str_col.to_string())),
275                    },
276                    Value {
277                        value_data: Some(value::ValueData::TimestampMillisecondValue(*int_col)),
278                    },
279                ];
280                Row { values }
281            })
282            .collect();
283        let schema = vec![
284            ColumnSchema {
285                column_name: "tag".to_string(),
286                datatype: ColumnDataType::String as i32,
287                semantic_type: SemanticType::Tag as i32,
288                ..Default::default()
289            },
290            ColumnSchema {
291                column_name: "ts".to_string(),
292                datatype: ColumnDataType::TimestampMillisecond as i32,
293                semantic_type: SemanticType::Timestamp as i32,
294                ..Default::default()
295            },
296        ];
297
298        Mutation {
299            op_type: op_type as i32,
300            sequence,
301            rows: Some(Rows { schema, rows }),
302            write_hint: None,
303        }
304    }
305
306    #[tokio::test]
307    async fn test_write_wal() {
308        let env = WalEnv::new().await;
309        let wal = env.new_wal();
310
311        let entry = WalEntry {
312            mutations: vec![
313                new_mutation(OpType::Put, 1, &[("k1", 1), ("k2", 2)]),
314                new_mutation(OpType::Put, 2, &[("k3", 3), ("k4", 4)]),
315            ],
316        };
317        let mut writer = wal.writer();
318        // Region 1 entry 1.
319        let region_id = RegionId::new(1, 1);
320        writer
321            .add_entry(
322                region_id,
323                1,
324                &entry,
325                &Provider::raft_engine_provider(region_id.as_u64()),
326            )
327            .unwrap();
328        // Region 2 entry 1.
329        let region_id = RegionId::new(1, 2);
330        writer
331            .add_entry(
332                region_id,
333                1,
334                &entry,
335                &Provider::raft_engine_provider(region_id.as_u64()),
336            )
337            .unwrap();
338        // Region 1 entry 2.
339        let region_id = RegionId::new(1, 2);
340        writer
341            .add_entry(
342                region_id,
343                2,
344                &entry,
345                &Provider::raft_engine_provider(region_id.as_u64()),
346            )
347            .unwrap();
348
349        // Test writing multiple region to wal.
350        writer.write_to_wal().await.unwrap();
351    }
352
353    fn sample_entries() -> Vec<WalEntry> {
354        vec![
355            WalEntry {
356                mutations: vec![
357                    new_mutation(OpType::Put, 1, &[("k1", 1), ("k2", 2)]),
358                    new_mutation(OpType::Put, 2, &[("k3", 3), ("k4", 4)]),
359                ],
360            },
361            WalEntry {
362                mutations: vec![new_mutation(OpType::Put, 3, &[("k1", 1), ("k2", 2)])],
363            },
364            WalEntry {
365                mutations: vec![
366                    new_mutation(OpType::Put, 4, &[("k1", 1), ("k2", 2)]),
367                    new_mutation(OpType::Put, 5, &[("k3", 3), ("k4", 4)]),
368                ],
369            },
370            WalEntry {
371                mutations: vec![new_mutation(OpType::Put, 6, &[("k1", 1), ("k2", 2)])],
372            },
373        ]
374    }
375
376    fn check_entries(
377        expect: &[WalEntry],
378        expect_start_id: EntryId,
379        actual: &[(EntryId, WalEntry)],
380    ) {
381        for (idx, (expect_entry, (actual_id, actual_entry))) in
382            expect.iter().zip(actual.iter()).enumerate()
383        {
384            let expect_id_entry = (expect_start_id + idx as u64, expect_entry);
385            assert_eq!(expect_id_entry, (*actual_id, actual_entry));
386        }
387        assert_eq!(expect.len(), actual.len());
388    }
389
390    #[tokio::test]
391    async fn test_scan_wal() {
392        let env = WalEnv::new().await;
393        let wal = env.new_wal();
394
395        let entries = sample_entries();
396        let (id1, id2) = (RegionId::new(1, 1), RegionId::new(1, 2));
397        let ns1 = Provider::raft_engine_provider(id1.as_u64());
398        let ns2 = Provider::raft_engine_provider(id2.as_u64());
399        let mut writer = wal.writer();
400        writer.add_entry(id1, 1, &entries[0], &ns1).unwrap();
401        // Insert one entry into region2. Scan should not return this entry.
402        writer.add_entry(id2, 1, &entries[0], &ns2).unwrap();
403        writer.add_entry(id1, 2, &entries[1], &ns1).unwrap();
404        writer.add_entry(id1, 3, &entries[2], &ns1).unwrap();
405        writer.add_entry(id1, 4, &entries[3], &ns1).unwrap();
406
407        writer.write_to_wal().await.unwrap();
408
409        // Scan all contents region1
410        let stream = wal.scan(id1, 1, &ns1).unwrap();
411        let actual: Vec<_> = stream.try_collect().await.unwrap();
412        check_entries(&entries, 1, &actual);
413
414        // Scan parts of contents
415        let stream = wal.scan(id1, 2, &ns1).unwrap();
416        let actual: Vec<_> = stream.try_collect().await.unwrap();
417        check_entries(&entries[1..], 2, &actual);
418
419        // Scan out of range
420        let stream = wal.scan(id1, 5, &ns1).unwrap();
421        let actual: Vec<_> = stream.try_collect().await.unwrap();
422        assert!(actual.is_empty());
423    }
424
425    #[tokio::test]
426    async fn test_obsolete_wal() {
427        let env = WalEnv::new().await;
428        let wal = env.new_wal();
429
430        let entries = sample_entries();
431        let mut writer = wal.writer();
432        let region_id = RegionId::new(1, 1);
433        let ns = Provider::raft_engine_provider(region_id.as_u64());
434        writer.add_entry(region_id, 1, &entries[0], &ns).unwrap();
435        writer.add_entry(region_id, 2, &entries[1], &ns).unwrap();
436        writer.add_entry(region_id, 3, &entries[2], &ns).unwrap();
437
438        writer.write_to_wal().await.unwrap();
439
440        // Delete 1, 2.
441        wal.obsolete(region_id, 2, &ns).await.unwrap();
442
443        // Put 4.
444        let mut writer = wal.writer();
445        writer.add_entry(region_id, 4, &entries[3], &ns).unwrap();
446        writer.write_to_wal().await.unwrap();
447
448        // Scan all
449        let stream = wal.scan(region_id, 1, &ns).unwrap();
450        let actual: Vec<_> = stream.try_collect().await.unwrap();
451        check_entries(&entries[2..], 3, &actual);
452    }
453}