mito2/
wal.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Write ahead log of the engine.
16
17pub(crate) mod entry_distributor;
18pub(crate) mod entry_reader;
19pub(crate) mod raw_entry_reader;
20
21use std::collections::HashMap;
22use std::mem;
23use std::sync::Arc;
24
25use api::v1::WalEntry;
26use common_error::ext::BoxedError;
27use common_telemetry::debug;
28use entry_reader::NoopEntryReader;
29use futures::future::BoxFuture;
30use futures::stream::BoxStream;
31use prost::Message;
32use snafu::ResultExt;
33use store_api::logstore::entry::Entry;
34use store_api::logstore::provider::Provider;
35use store_api::logstore::{AppendBatchResponse, LogStore, WalIndex};
36use store_api::storage::RegionId;
37
38use crate::error::{BuildEntrySnafu, DeleteWalSnafu, Result, WriteWalSnafu};
39use crate::wal::entry_reader::{LogStoreEntryReader, WalEntryReader};
40use crate::wal::raw_entry_reader::{LogStoreRawEntryReader, RegionRawEntryReader};
41
42/// WAL entry id.
43pub type EntryId = store_api::logstore::entry::Id;
44/// A stream that yields tuple of WAL entry id and corresponding entry.
45pub type WalEntryStream<'a> = BoxStream<'a, Result<(EntryId, WalEntry)>>;
46
47/// Write ahead log.
48///
49/// All regions in the engine shares the same WAL instance.
50#[derive(Debug)]
51pub struct Wal<S> {
52    /// The underlying log store.
53    store: Arc<S>,
54}
55
56impl<S> Wal<S> {
57    /// Creates a new [Wal] from the log store.
58    pub fn new(store: Arc<S>) -> Self {
59        Self { store }
60    }
61
62    pub fn store(&self) -> &Arc<S> {
63        &self.store
64    }
65}
66
67impl<S> Clone for Wal<S> {
68    fn clone(&self) -> Self {
69        Self {
70            store: self.store.clone(),
71        }
72    }
73}
74
75impl<S: LogStore> Wal<S> {
76    /// Returns a writer to write to the WAL.
77    pub fn writer(&self) -> WalWriter<S> {
78        WalWriter {
79            store: self.store.clone(),
80            entries: Vec::new(),
81            providers: HashMap::new(),
82        }
83    }
84
85    /// Returns a [OnRegionOpened] function.
86    pub(crate) fn on_region_opened(
87        &self,
88    ) -> impl FnOnce(RegionId, EntryId, &Provider) -> BoxFuture<Result<()>> {
89        let store = self.store.clone();
90        move |region_id, last_entry_id, provider| -> BoxFuture<'_, Result<()>> {
91            if let Provider::Noop = provider {
92                debug!("Skip obsolete for region: {}", region_id);
93                return Box::pin(async move { Ok(()) });
94            }
95            Box::pin(async move {
96                store
97                    .obsolete(provider, region_id, last_entry_id)
98                    .await
99                    .map_err(BoxedError::new)
100                    .context(DeleteWalSnafu { region_id })
101            })
102        }
103    }
104
105    /// Returns a [WalEntryReader]
106    pub(crate) fn wal_entry_reader(
107        &self,
108        provider: &Provider,
109        region_id: RegionId,
110        location_id: Option<u64>,
111    ) -> Box<dyn WalEntryReader> {
112        match provider {
113            Provider::RaftEngine(_) => Box::new(LogStoreEntryReader::new(
114                LogStoreRawEntryReader::new(self.store.clone()),
115            )),
116            Provider::Kafka(_) => {
117                let reader = if let Some(location_id) = location_id {
118                    LogStoreRawEntryReader::new(self.store.clone())
119                        .with_wal_index(WalIndex::new(region_id, location_id))
120                } else {
121                    LogStoreRawEntryReader::new(self.store.clone())
122                };
123
124                Box::new(LogStoreEntryReader::new(RegionRawEntryReader::new(
125                    reader, region_id,
126                )))
127            }
128            Provider::Noop => Box::new(NoopEntryReader),
129        }
130    }
131
132    /// Scan entries of specific region starting from `start_id` (inclusive).
133    /// Currently only used in tests.
134    pub fn scan<'a>(
135        &'a self,
136        region_id: RegionId,
137        start_id: EntryId,
138        provider: &'a Provider,
139    ) -> Result<WalEntryStream<'a>> {
140        match provider {
141            Provider::RaftEngine(_) => {
142                LogStoreEntryReader::new(LogStoreRawEntryReader::new(self.store.clone()))
143                    .read(provider, start_id)
144            }
145            Provider::Kafka(_) => LogStoreEntryReader::new(RegionRawEntryReader::new(
146                LogStoreRawEntryReader::new(self.store.clone()),
147                region_id,
148            ))
149            .read(provider, start_id),
150            Provider::Noop => Ok(Box::pin(futures::stream::empty())),
151        }
152    }
153
154    /// Mark entries whose ids `<= last_id` as deleted.
155    pub async fn obsolete(
156        &self,
157        region_id: RegionId,
158        last_id: EntryId,
159        provider: &Provider,
160    ) -> Result<()> {
161        if let Provider::Noop = provider {
162            return Ok(());
163        }
164        self.store
165            .obsolete(provider, region_id, last_id)
166            .await
167            .map_err(BoxedError::new)
168            .context(DeleteWalSnafu { region_id })
169    }
170}
171
172/// WAL batch writer.
173pub struct WalWriter<S: LogStore> {
174    /// Log store of the WAL.
175    store: Arc<S>,
176    /// Entries to write.
177    entries: Vec<Entry>,
178    /// Providers of regions being written into.
179    providers: HashMap<RegionId, Provider>,
180}
181
182impl<S: LogStore> WalWriter<S> {
183    /// Add a wal entry for specific region to the writer's buffer.
184    pub fn add_entry(
185        &mut self,
186        region_id: RegionId,
187        entry_id: EntryId,
188        wal_entry: &WalEntry,
189        provider: &Provider,
190    ) -> Result<()> {
191        // Gets or inserts with a newly built provider.
192        let provider = self
193            .providers
194            .entry(region_id)
195            .or_insert_with(|| provider.clone());
196
197        let data = wal_entry.encode_to_vec();
198        let entry = self
199            .store
200            .entry(data, entry_id, region_id, provider)
201            .map_err(BoxedError::new)
202            .context(BuildEntrySnafu { region_id })?;
203
204        self.entries.push(entry);
205
206        Ok(())
207    }
208
209    /// Write all buffered entries to the WAL.
210    pub async fn write_to_wal(&mut self) -> Result<AppendBatchResponse> {
211        // TODO(yingwen): metrics.
212
213        let entries = mem::take(&mut self.entries);
214        self.store
215            .append_batch(entries)
216            .await
217            .map_err(BoxedError::new)
218            .context(WriteWalSnafu)
219    }
220}
221
222#[cfg(test)]
223mod tests {
224    use api::v1::helper::{tag_column_schema, time_index_column_schema};
225    use api::v1::{
226        bulk_wal_entry, value, ArrowIpc, BulkWalEntry, ColumnDataType, Mutation, OpType, Row, Rows,
227        Value,
228    };
229    use common_recordbatch::DfRecordBatch;
230    use common_test_util::flight::encode_to_flight_data;
231    use common_test_util::temp_dir::{create_temp_dir, TempDir};
232    use datatypes::arrow;
233    use datatypes::arrow::array::{ArrayRef, TimestampMillisecondArray};
234    use datatypes::arrow::datatypes::Field;
235    use datatypes::arrow_array::StringArray;
236    use futures::TryStreamExt;
237    use log_store::raft_engine::log_store::RaftEngineLogStore;
238    use log_store::test_util::log_store_util;
239    use store_api::storage::SequenceNumber;
240
241    use super::*;
242
243    struct WalEnv {
244        _wal_dir: TempDir,
245        log_store: Option<Arc<RaftEngineLogStore>>,
246    }
247
248    impl WalEnv {
249        async fn new() -> WalEnv {
250            let wal_dir = create_temp_dir("");
251            let log_store =
252                log_store_util::create_tmp_local_file_log_store(wal_dir.path().to_str().unwrap())
253                    .await;
254            WalEnv {
255                _wal_dir: wal_dir,
256                log_store: Some(Arc::new(log_store)),
257            }
258        }
259
260        fn new_wal(&self) -> Wal<RaftEngineLogStore> {
261            let log_store = self.log_store.clone().unwrap();
262            Wal::new(log_store)
263        }
264    }
265
266    /// Create a new mutation from rows.
267    ///
268    /// The row format is (string, i64).
269    fn new_mutation(op_type: OpType, sequence: SequenceNumber, rows: &[(&str, i64)]) -> Mutation {
270        let rows = rows
271            .iter()
272            .map(|(str_col, int_col)| {
273                let values = vec![
274                    Value {
275                        value_data: Some(value::ValueData::StringValue(str_col.to_string())),
276                    },
277                    Value {
278                        value_data: Some(value::ValueData::TimestampMillisecondValue(*int_col)),
279                    },
280                ];
281                Row { values }
282            })
283            .collect();
284        let schema = vec![
285            tag_column_schema("tag", ColumnDataType::String),
286            time_index_column_schema("ts", ColumnDataType::TimestampMillisecond),
287        ];
288
289        Mutation {
290            op_type: op_type as i32,
291            sequence,
292            rows: Some(Rows { schema, rows }),
293            write_hint: None,
294        }
295    }
296
297    #[tokio::test]
298    async fn test_write_wal() {
299        let env = WalEnv::new().await;
300        let wal = env.new_wal();
301
302        let entry = WalEntry {
303            mutations: vec![
304                new_mutation(OpType::Put, 1, &[("k1", 1), ("k2", 2)]),
305                new_mutation(OpType::Put, 2, &[("k3", 3), ("k4", 4)]),
306            ],
307            bulk_entries: vec![],
308        };
309        let mut writer = wal.writer();
310        // Region 1 entry 1.
311        let region_id = RegionId::new(1, 1);
312        writer
313            .add_entry(
314                region_id,
315                1,
316                &entry,
317                &Provider::raft_engine_provider(region_id.as_u64()),
318            )
319            .unwrap();
320        // Region 2 entry 1.
321        let region_id = RegionId::new(1, 2);
322        writer
323            .add_entry(
324                region_id,
325                1,
326                &entry,
327                &Provider::raft_engine_provider(region_id.as_u64()),
328            )
329            .unwrap();
330        // Region 1 entry 2.
331        let region_id = RegionId::new(1, 2);
332        writer
333            .add_entry(
334                region_id,
335                2,
336                &entry,
337                &Provider::raft_engine_provider(region_id.as_u64()),
338            )
339            .unwrap();
340
341        // Test writing multiple region to wal.
342        writer.write_to_wal().await.unwrap();
343    }
344
345    fn build_record_batch(rows: &[(&str, i64)]) -> DfRecordBatch {
346        let schema = Arc::new(arrow::datatypes::Schema::new(vec![
347            Field::new("tag", arrow::datatypes::DataType::Utf8, false),
348            Field::new(
349                "ts",
350                arrow::datatypes::DataType::Timestamp(
351                    arrow::datatypes::TimeUnit::Millisecond,
352                    None,
353                ),
354                false,
355            ),
356        ]));
357
358        let tag = Arc::new(StringArray::from_iter_values(
359            rows.iter().map(|r| r.0.to_string()),
360        )) as ArrayRef;
361        let ts = Arc::new(TimestampMillisecondArray::from_iter_values(
362            rows.iter().map(|r| r.1),
363        )) as ArrayRef;
364        DfRecordBatch::try_new(schema, vec![tag, ts]).unwrap()
365    }
366
367    fn build_bulk_wal_entry(sequence_number: SequenceNumber, rows: &[(&str, i64)]) -> BulkWalEntry {
368        let rb = build_record_batch(rows);
369        let (schema, rb) = encode_to_flight_data(rb);
370        let max_ts = rows.iter().map(|r| r.1).max().unwrap();
371        let min_ts = rows.iter().map(|r| r.1).min().unwrap();
372        BulkWalEntry {
373            sequence: sequence_number,
374            max_ts,
375            min_ts,
376            timestamp_index: 1,
377            body: Some(bulk_wal_entry::Body::ArrowIpc(ArrowIpc {
378                schema: schema.data_header,
379                data_header: rb.data_header,
380                payload: rb.data_body,
381            })),
382        }
383    }
384
385    fn sample_entries() -> Vec<WalEntry> {
386        vec![
387            WalEntry {
388                mutations: vec![
389                    new_mutation(OpType::Put, 1, &[("k1", 1), ("k2", 2)]),
390                    new_mutation(OpType::Put, 2, &[("k3", 3), ("k4", 4)]),
391                ],
392                bulk_entries: vec![],
393            },
394            WalEntry {
395                mutations: vec![new_mutation(OpType::Put, 3, &[("k1", 1), ("k2", 2)])],
396                bulk_entries: vec![],
397            },
398            WalEntry {
399                mutations: vec![
400                    new_mutation(OpType::Put, 4, &[("k1", 1), ("k2", 2)]),
401                    new_mutation(OpType::Put, 5, &[("k3", 3), ("k4", 4)]),
402                ],
403                bulk_entries: vec![],
404            },
405            WalEntry {
406                mutations: vec![new_mutation(OpType::Put, 6, &[("k1", 1), ("k2", 2)])],
407                bulk_entries: vec![build_bulk_wal_entry(7, &[("k1", 8), ("k2", 9)])],
408            },
409        ]
410    }
411
412    fn check_entries(
413        expect: &[WalEntry],
414        expect_start_id: EntryId,
415        actual: &[(EntryId, WalEntry)],
416    ) {
417        for (idx, (expect_entry, (actual_id, actual_entry))) in
418            expect.iter().zip(actual.iter()).enumerate()
419        {
420            let expect_id_entry = (expect_start_id + idx as u64, expect_entry);
421            assert_eq!(expect_id_entry, (*actual_id, actual_entry));
422        }
423        assert_eq!(expect.len(), actual.len());
424    }
425
426    #[tokio::test]
427    async fn test_scan_wal() {
428        let env = WalEnv::new().await;
429        let wal = env.new_wal();
430
431        let entries = sample_entries();
432        let (id1, id2) = (RegionId::new(1, 1), RegionId::new(1, 2));
433        let ns1 = Provider::raft_engine_provider(id1.as_u64());
434        let ns2 = Provider::raft_engine_provider(id2.as_u64());
435        let mut writer = wal.writer();
436        writer.add_entry(id1, 1, &entries[0], &ns1).unwrap();
437        // Insert one entry into region2. Scan should not return this entry.
438        writer.add_entry(id2, 1, &entries[0], &ns2).unwrap();
439        writer.add_entry(id1, 2, &entries[1], &ns1).unwrap();
440        writer.add_entry(id1, 3, &entries[2], &ns1).unwrap();
441        writer.add_entry(id1, 4, &entries[3], &ns1).unwrap();
442
443        writer.write_to_wal().await.unwrap();
444
445        // Scan all contents region1
446        let stream = wal.scan(id1, 1, &ns1).unwrap();
447        let actual: Vec<_> = stream.try_collect().await.unwrap();
448        check_entries(&entries, 1, &actual);
449
450        // Scan parts of contents
451        let stream = wal.scan(id1, 2, &ns1).unwrap();
452        let actual: Vec<_> = stream.try_collect().await.unwrap();
453        check_entries(&entries[1..], 2, &actual);
454
455        // Scan out of range
456        let stream = wal.scan(id1, 5, &ns1).unwrap();
457        let actual: Vec<_> = stream.try_collect().await.unwrap();
458        assert!(actual.is_empty());
459    }
460
461    #[tokio::test]
462    async fn test_obsolete_wal() {
463        let env = WalEnv::new().await;
464        let wal = env.new_wal();
465
466        let entries = sample_entries();
467        let mut writer = wal.writer();
468        let region_id = RegionId::new(1, 1);
469        let ns = Provider::raft_engine_provider(region_id.as_u64());
470        writer.add_entry(region_id, 1, &entries[0], &ns).unwrap();
471        writer.add_entry(region_id, 2, &entries[1], &ns).unwrap();
472        writer.add_entry(region_id, 3, &entries[2], &ns).unwrap();
473
474        writer.write_to_wal().await.unwrap();
475
476        // Delete 1, 2.
477        wal.obsolete(region_id, 2, &ns).await.unwrap();
478
479        // Put 4.
480        let mut writer = wal.writer();
481        writer.add_entry(region_id, 4, &entries[3], &ns).unwrap();
482        writer.write_to_wal().await.unwrap();
483
484        // Scan all
485        let stream = wal.scan(region_id, 1, &ns).unwrap();
486        let actual: Vec<_> = stream.try_collect().await.unwrap();
487        check_entries(&entries[2..], 3, &actual);
488    }
489}