mito2/
wal.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Write ahead log of the engine.
16
17pub(crate) mod entry_distributor;
18pub(crate) mod entry_reader;
19pub(crate) mod raw_entry_reader;
20
21use std::collections::HashMap;
22use std::mem;
23use std::sync::Arc;
24
25use api::v1::WalEntry;
26use common_error::ext::BoxedError;
27use common_telemetry::debug;
28use entry_reader::NoopEntryReader;
29use futures::future::BoxFuture;
30use futures::stream::BoxStream;
31use prost::Message;
32use snafu::ResultExt;
33use store_api::logstore::entry::Entry;
34use store_api::logstore::provider::Provider;
35use store_api::logstore::{AppendBatchResponse, LogStore, WalIndex};
36use store_api::storage::RegionId;
37
38use crate::error::{BuildEntrySnafu, DeleteWalSnafu, Result, WriteWalSnafu};
39use crate::wal::entry_reader::{LogStoreEntryReader, WalEntryReader};
40use crate::wal::raw_entry_reader::{LogStoreRawEntryReader, RegionRawEntryReader};
41
42/// WAL entry id.
43pub type EntryId = store_api::logstore::entry::Id;
44/// A stream that yields tuple of WAL entry id and corresponding entry.
45pub type WalEntryStream<'a> = BoxStream<'a, Result<(EntryId, WalEntry)>>;
46
47/// Write ahead log.
48///
49/// All regions in the engine shares the same WAL instance.
50#[derive(Debug)]
51pub struct Wal<S> {
52    /// The underlying log store.
53    store: Arc<S>,
54}
55
56impl<S> Wal<S> {
57    /// Creates a new [Wal] from the log store.
58    pub fn new(store: Arc<S>) -> Self {
59        Self { store }
60    }
61
62    pub fn store(&self) -> &Arc<S> {
63        &self.store
64    }
65}
66
67impl<S> Clone for Wal<S> {
68    fn clone(&self) -> Self {
69        Self {
70            store: self.store.clone(),
71        }
72    }
73}
74
75impl<S: LogStore> Wal<S> {
76    /// Returns a writer to write to the WAL.
77    pub fn writer(&self) -> WalWriter<S> {
78        WalWriter {
79            store: self.store.clone(),
80            entries: Vec::new(),
81            providers: HashMap::new(),
82        }
83    }
84
85    /// Returns a [OnRegionOpened] function.
86    pub(crate) fn on_region_opened(
87        &self,
88    ) -> impl FnOnce(RegionId, EntryId, &Provider) -> BoxFuture<Result<()>> {
89        let store = self.store.clone();
90        move |region_id, last_entry_id, provider| -> BoxFuture<'_, Result<()>> {
91            if let Provider::Noop = provider {
92                debug!("Skip obsolete for region: {}", region_id);
93                return Box::pin(async move { Ok(()) });
94            }
95            Box::pin(async move {
96                store
97                    .obsolete(provider, region_id, last_entry_id)
98                    .await
99                    .map_err(BoxedError::new)
100                    .context(DeleteWalSnafu { region_id })
101            })
102        }
103    }
104
105    /// Returns a [WalEntryReader]
106    pub(crate) fn wal_entry_reader(
107        &self,
108        provider: &Provider,
109        region_id: RegionId,
110        location_id: Option<u64>,
111    ) -> Box<dyn WalEntryReader> {
112        match provider {
113            Provider::RaftEngine(_) => Box::new(LogStoreEntryReader::new(
114                LogStoreRawEntryReader::new(self.store.clone()),
115            )),
116            Provider::Kafka(_) => {
117                let reader = if let Some(location_id) = location_id {
118                    LogStoreRawEntryReader::new(self.store.clone())
119                        .with_wal_index(WalIndex::new(region_id, location_id))
120                } else {
121                    LogStoreRawEntryReader::new(self.store.clone())
122                };
123
124                Box::new(LogStoreEntryReader::new(RegionRawEntryReader::new(
125                    reader, region_id,
126                )))
127            }
128            Provider::Noop => Box::new(NoopEntryReader),
129        }
130    }
131
132    /// Scan entries of specific region starting from `start_id` (inclusive).
133    /// Currently only used in tests.
134    pub fn scan<'a>(
135        &'a self,
136        region_id: RegionId,
137        start_id: EntryId,
138        provider: &'a Provider,
139    ) -> Result<WalEntryStream<'a>> {
140        match provider {
141            Provider::RaftEngine(_) => {
142                LogStoreEntryReader::new(LogStoreRawEntryReader::new(self.store.clone()))
143                    .read(provider, start_id)
144            }
145            Provider::Kafka(_) => LogStoreEntryReader::new(RegionRawEntryReader::new(
146                LogStoreRawEntryReader::new(self.store.clone()),
147                region_id,
148            ))
149            .read(provider, start_id),
150            Provider::Noop => Ok(Box::pin(futures::stream::empty())),
151        }
152    }
153
154    /// Mark entries whose ids `<= last_id` as deleted.
155    pub async fn obsolete(
156        &self,
157        region_id: RegionId,
158        last_id: EntryId,
159        provider: &Provider,
160    ) -> Result<()> {
161        if let Provider::Noop = provider {
162            return Ok(());
163        }
164        self.store
165            .obsolete(provider, region_id, last_id)
166            .await
167            .map_err(BoxedError::new)
168            .context(DeleteWalSnafu { region_id })
169    }
170}
171
172/// WAL batch writer.
173pub struct WalWriter<S: LogStore> {
174    /// Log store of the WAL.
175    store: Arc<S>,
176    /// Entries to write.
177    entries: Vec<Entry>,
178    /// Providers of regions being written into.
179    providers: HashMap<RegionId, Provider>,
180}
181
182impl<S: LogStore> WalWriter<S> {
183    /// Add a wal entry for specific region to the writer's buffer.
184    pub fn add_entry(
185        &mut self,
186        region_id: RegionId,
187        entry_id: EntryId,
188        wal_entry: &WalEntry,
189        provider: &Provider,
190    ) -> Result<()> {
191        // Gets or inserts with a newly built provider.
192        let provider = self
193            .providers
194            .entry(region_id)
195            .or_insert_with(|| provider.clone());
196
197        let data = wal_entry.encode_to_vec();
198        let entry = self
199            .store
200            .entry(data, entry_id, region_id, provider)
201            .map_err(BoxedError::new)
202            .context(BuildEntrySnafu { region_id })?;
203
204        self.entries.push(entry);
205
206        Ok(())
207    }
208
209    /// Write all buffered entries to the WAL.
210    pub async fn write_to_wal(&mut self) -> Result<AppendBatchResponse> {
211        // TODO(yingwen): metrics.
212
213        let entries = mem::take(&mut self.entries);
214        self.store
215            .append_batch(entries)
216            .await
217            .map_err(BoxedError::new)
218            .context(WriteWalSnafu)
219    }
220}
221
222#[cfg(test)]
223mod tests {
224    use api::v1::{
225        bulk_wal_entry, value, ArrowIpc, BulkWalEntry, ColumnDataType, ColumnSchema, Mutation,
226        OpType, Row, Rows, SemanticType, Value,
227    };
228    use common_recordbatch::DfRecordBatch;
229    use common_test_util::flight::encode_to_flight_data;
230    use common_test_util::temp_dir::{create_temp_dir, TempDir};
231    use datatypes::arrow;
232    use datatypes::arrow::array::{ArrayRef, TimestampMillisecondArray};
233    use datatypes::arrow::datatypes::Field;
234    use datatypes::arrow_array::StringArray;
235    use futures::TryStreamExt;
236    use log_store::raft_engine::log_store::RaftEngineLogStore;
237    use log_store::test_util::log_store_util;
238    use store_api::storage::SequenceNumber;
239
240    use super::*;
241
242    struct WalEnv {
243        _wal_dir: TempDir,
244        log_store: Option<Arc<RaftEngineLogStore>>,
245    }
246
247    impl WalEnv {
248        async fn new() -> WalEnv {
249            let wal_dir = create_temp_dir("");
250            let log_store =
251                log_store_util::create_tmp_local_file_log_store(wal_dir.path().to_str().unwrap())
252                    .await;
253            WalEnv {
254                _wal_dir: wal_dir,
255                log_store: Some(Arc::new(log_store)),
256            }
257        }
258
259        fn new_wal(&self) -> Wal<RaftEngineLogStore> {
260            let log_store = self.log_store.clone().unwrap();
261            Wal::new(log_store)
262        }
263    }
264
265    /// Create a new mutation from rows.
266    ///
267    /// The row format is (string, i64).
268    fn new_mutation(op_type: OpType, sequence: SequenceNumber, rows: &[(&str, i64)]) -> Mutation {
269        let rows = rows
270            .iter()
271            .map(|(str_col, int_col)| {
272                let values = vec![
273                    Value {
274                        value_data: Some(value::ValueData::StringValue(str_col.to_string())),
275                    },
276                    Value {
277                        value_data: Some(value::ValueData::TimestampMillisecondValue(*int_col)),
278                    },
279                ];
280                Row { values }
281            })
282            .collect();
283        let schema = vec![
284            ColumnSchema {
285                column_name: "tag".to_string(),
286                datatype: ColumnDataType::String as i32,
287                semantic_type: SemanticType::Tag as i32,
288                ..Default::default()
289            },
290            ColumnSchema {
291                column_name: "ts".to_string(),
292                datatype: ColumnDataType::TimestampMillisecond as i32,
293                semantic_type: SemanticType::Timestamp as i32,
294                ..Default::default()
295            },
296        ];
297
298        Mutation {
299            op_type: op_type as i32,
300            sequence,
301            rows: Some(Rows { schema, rows }),
302            write_hint: None,
303        }
304    }
305
306    #[tokio::test]
307    async fn test_write_wal() {
308        let env = WalEnv::new().await;
309        let wal = env.new_wal();
310
311        let entry = WalEntry {
312            mutations: vec![
313                new_mutation(OpType::Put, 1, &[("k1", 1), ("k2", 2)]),
314                new_mutation(OpType::Put, 2, &[("k3", 3), ("k4", 4)]),
315            ],
316            bulk_entries: vec![],
317        };
318        let mut writer = wal.writer();
319        // Region 1 entry 1.
320        let region_id = RegionId::new(1, 1);
321        writer
322            .add_entry(
323                region_id,
324                1,
325                &entry,
326                &Provider::raft_engine_provider(region_id.as_u64()),
327            )
328            .unwrap();
329        // Region 2 entry 1.
330        let region_id = RegionId::new(1, 2);
331        writer
332            .add_entry(
333                region_id,
334                1,
335                &entry,
336                &Provider::raft_engine_provider(region_id.as_u64()),
337            )
338            .unwrap();
339        // Region 1 entry 2.
340        let region_id = RegionId::new(1, 2);
341        writer
342            .add_entry(
343                region_id,
344                2,
345                &entry,
346                &Provider::raft_engine_provider(region_id.as_u64()),
347            )
348            .unwrap();
349
350        // Test writing multiple region to wal.
351        writer.write_to_wal().await.unwrap();
352    }
353
354    fn build_record_batch(rows: &[(&str, i64)]) -> DfRecordBatch {
355        let schema = Arc::new(arrow::datatypes::Schema::new(vec![
356            Field::new("tag", arrow::datatypes::DataType::Utf8, false),
357            Field::new(
358                "ts",
359                arrow::datatypes::DataType::Timestamp(
360                    arrow::datatypes::TimeUnit::Millisecond,
361                    None,
362                ),
363                false,
364            ),
365        ]));
366
367        let tag = Arc::new(StringArray::from_iter_values(
368            rows.iter().map(|r| r.0.to_string()),
369        )) as ArrayRef;
370        let ts = Arc::new(TimestampMillisecondArray::from_iter_values(
371            rows.iter().map(|r| r.1),
372        )) as ArrayRef;
373        DfRecordBatch::try_new(schema, vec![tag, ts]).unwrap()
374    }
375
376    fn build_bulk_wal_entry(sequence_number: SequenceNumber, rows: &[(&str, i64)]) -> BulkWalEntry {
377        let rb = build_record_batch(rows);
378        let (schema, rb) = encode_to_flight_data(rb);
379        let max_ts = rows.iter().map(|r| r.1).max().unwrap();
380        let min_ts = rows.iter().map(|r| r.1).min().unwrap();
381        BulkWalEntry {
382            sequence: sequence_number,
383            max_ts,
384            min_ts,
385            timestamp_index: 1,
386            body: Some(bulk_wal_entry::Body::ArrowIpc(ArrowIpc {
387                schema: schema.data_header,
388                data_header: rb.data_header,
389                payload: rb.data_body,
390            })),
391        }
392    }
393
394    fn sample_entries() -> Vec<WalEntry> {
395        vec![
396            WalEntry {
397                mutations: vec![
398                    new_mutation(OpType::Put, 1, &[("k1", 1), ("k2", 2)]),
399                    new_mutation(OpType::Put, 2, &[("k3", 3), ("k4", 4)]),
400                ],
401                bulk_entries: vec![],
402            },
403            WalEntry {
404                mutations: vec![new_mutation(OpType::Put, 3, &[("k1", 1), ("k2", 2)])],
405                bulk_entries: vec![],
406            },
407            WalEntry {
408                mutations: vec![
409                    new_mutation(OpType::Put, 4, &[("k1", 1), ("k2", 2)]),
410                    new_mutation(OpType::Put, 5, &[("k3", 3), ("k4", 4)]),
411                ],
412                bulk_entries: vec![],
413            },
414            WalEntry {
415                mutations: vec![new_mutation(OpType::Put, 6, &[("k1", 1), ("k2", 2)])],
416                bulk_entries: vec![build_bulk_wal_entry(7, &[("k1", 8), ("k2", 9)])],
417            },
418        ]
419    }
420
421    fn check_entries(
422        expect: &[WalEntry],
423        expect_start_id: EntryId,
424        actual: &[(EntryId, WalEntry)],
425    ) {
426        for (idx, (expect_entry, (actual_id, actual_entry))) in
427            expect.iter().zip(actual.iter()).enumerate()
428        {
429            let expect_id_entry = (expect_start_id + idx as u64, expect_entry);
430            assert_eq!(expect_id_entry, (*actual_id, actual_entry));
431        }
432        assert_eq!(expect.len(), actual.len());
433    }
434
435    #[tokio::test]
436    async fn test_scan_wal() {
437        let env = WalEnv::new().await;
438        let wal = env.new_wal();
439
440        let entries = sample_entries();
441        let (id1, id2) = (RegionId::new(1, 1), RegionId::new(1, 2));
442        let ns1 = Provider::raft_engine_provider(id1.as_u64());
443        let ns2 = Provider::raft_engine_provider(id2.as_u64());
444        let mut writer = wal.writer();
445        writer.add_entry(id1, 1, &entries[0], &ns1).unwrap();
446        // Insert one entry into region2. Scan should not return this entry.
447        writer.add_entry(id2, 1, &entries[0], &ns2).unwrap();
448        writer.add_entry(id1, 2, &entries[1], &ns1).unwrap();
449        writer.add_entry(id1, 3, &entries[2], &ns1).unwrap();
450        writer.add_entry(id1, 4, &entries[3], &ns1).unwrap();
451
452        writer.write_to_wal().await.unwrap();
453
454        // Scan all contents region1
455        let stream = wal.scan(id1, 1, &ns1).unwrap();
456        let actual: Vec<_> = stream.try_collect().await.unwrap();
457        check_entries(&entries, 1, &actual);
458
459        // Scan parts of contents
460        let stream = wal.scan(id1, 2, &ns1).unwrap();
461        let actual: Vec<_> = stream.try_collect().await.unwrap();
462        check_entries(&entries[1..], 2, &actual);
463
464        // Scan out of range
465        let stream = wal.scan(id1, 5, &ns1).unwrap();
466        let actual: Vec<_> = stream.try_collect().await.unwrap();
467        assert!(actual.is_empty());
468    }
469
470    #[tokio::test]
471    async fn test_obsolete_wal() {
472        let env = WalEnv::new().await;
473        let wal = env.new_wal();
474
475        let entries = sample_entries();
476        let mut writer = wal.writer();
477        let region_id = RegionId::new(1, 1);
478        let ns = Provider::raft_engine_provider(region_id.as_u64());
479        writer.add_entry(region_id, 1, &entries[0], &ns).unwrap();
480        writer.add_entry(region_id, 2, &entries[1], &ns).unwrap();
481        writer.add_entry(region_id, 3, &entries[2], &ns).unwrap();
482
483        writer.write_to_wal().await.unwrap();
484
485        // Delete 1, 2.
486        wal.obsolete(region_id, 2, &ns).await.unwrap();
487
488        // Put 4.
489        let mut writer = wal.writer();
490        writer.add_entry(region_id, 4, &entries[3], &ns).unwrap();
491        writer.write_to_wal().await.unwrap();
492
493        // Scan all
494        let stream = wal.scan(region_id, 1, &ns).unwrap();
495        let actual: Vec<_> = stream.try_collect().await.unwrap();
496        check_entries(&entries[2..], 3, &actual);
497    }
498}