mito2/
wal.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Write ahead log of the engine.
16
17pub(crate) mod entry_distributor;
18pub(crate) mod entry_reader;
19pub(crate) mod raw_entry_reader;
20
21use std::collections::HashMap;
22use std::mem;
23use std::sync::Arc;
24
25use api::v1::WalEntry;
26use common_error::ext::BoxedError;
27use common_telemetry::debug;
28use entry_reader::NoopEntryReader;
29use futures::future::BoxFuture;
30use futures::stream::BoxStream;
31use prost::Message;
32use snafu::ResultExt;
33use store_api::logstore::entry::Entry;
34use store_api::logstore::provider::Provider;
35use store_api::logstore::{AppendBatchResponse, LogStore, WalIndex};
36use store_api::storage::RegionId;
37
38use crate::error::{BuildEntrySnafu, DeleteWalSnafu, Result, WriteWalSnafu};
39use crate::wal::entry_reader::{LogStoreEntryReader, WalEntryReader};
40use crate::wal::raw_entry_reader::{LogStoreRawEntryReader, RegionRawEntryReader};
41
42/// WAL entry id.
43pub type EntryId = store_api::logstore::entry::Id;
44/// A stream that yields tuple of WAL entry id and corresponding entry.
45pub type WalEntryStream<'a> = BoxStream<'a, Result<(EntryId, WalEntry)>>;
46
47/// Write ahead log.
48///
49/// All regions in the engine shares the same WAL instance.
50#[derive(Debug)]
51pub struct Wal<S> {
52    /// The underlying log store.
53    store: Arc<S>,
54}
55
56impl<S> Wal<S> {
57    /// Creates a new [Wal] from the log store.
58    pub fn new(store: Arc<S>) -> Self {
59        Self { store }
60    }
61
62    pub fn store(&self) -> &Arc<S> {
63        &self.store
64    }
65}
66
67impl<S> Clone for Wal<S> {
68    fn clone(&self) -> Self {
69        Self {
70            store: Arc::clone(&self.store),
71        }
72    }
73}
74
75impl<S: LogStore> Wal<S> {
76    /// Returns a writer to write to the WAL.
77    pub fn writer(&self) -> WalWriter<S> {
78        WalWriter {
79            store: self.store.clone(),
80            entries: Vec::new(),
81            providers: HashMap::new(),
82        }
83    }
84
85    /// Returns a [OnRegionOpened] function.
86    pub(crate) fn on_region_opened(
87        &self,
88    ) -> impl FnOnce(RegionId, EntryId, &Provider) -> BoxFuture<Result<()>> {
89        let store = self.store.clone();
90        move |region_id, last_entry_id, provider| -> BoxFuture<'_, Result<()>> {
91            if let Provider::Noop = provider {
92                debug!("Skip obsolete for region: {}", region_id);
93                return Box::pin(async move { Ok(()) });
94            }
95            Box::pin(async move {
96                store
97                    .obsolete(provider, region_id, last_entry_id)
98                    .await
99                    .map_err(BoxedError::new)
100                    .context(DeleteWalSnafu { region_id })
101            })
102        }
103    }
104
105    /// Returns a [WalEntryReader]
106    pub(crate) fn wal_entry_reader(
107        &self,
108        provider: &Provider,
109        region_id: RegionId,
110        location_id: Option<u64>,
111    ) -> Box<dyn WalEntryReader> {
112        match provider {
113            Provider::RaftEngine(_) => Box::new(LogStoreEntryReader::new(
114                LogStoreRawEntryReader::new(self.store.clone()),
115            )),
116            Provider::Kafka(_) => {
117                let reader = if let Some(location_id) = location_id {
118                    LogStoreRawEntryReader::new(self.store.clone())
119                        .with_wal_index(WalIndex::new(region_id, location_id))
120                } else {
121                    LogStoreRawEntryReader::new(self.store.clone())
122                };
123
124                Box::new(LogStoreEntryReader::new(RegionRawEntryReader::new(
125                    reader, region_id,
126                )))
127            }
128            Provider::Noop => Box::new(NoopEntryReader),
129        }
130    }
131
132    /// Scan entries of specific region starting from `start_id` (inclusive).
133    /// Currently only used in tests.
134    pub fn scan<'a>(
135        &'a self,
136        region_id: RegionId,
137        start_id: EntryId,
138        provider: &'a Provider,
139    ) -> Result<WalEntryStream<'a>> {
140        let mut reader = self.wal_entry_reader(provider, region_id, None);
141        reader.read(provider, start_id)
142    }
143
144    /// Mark entries whose ids `<= last_id` as deleted.
145    pub async fn obsolete(
146        &self,
147        region_id: RegionId,
148        last_id: EntryId,
149        provider: &Provider,
150    ) -> Result<()> {
151        if let Provider::Noop = provider {
152            return Ok(());
153        }
154        self.store
155            .obsolete(provider, region_id, last_id)
156            .await
157            .map_err(BoxedError::new)
158            .context(DeleteWalSnafu { region_id })
159    }
160}
161
162/// WAL batch writer.
163pub struct WalWriter<S: LogStore> {
164    /// Log store of the WAL.
165    store: Arc<S>,
166    /// Entries to write.
167    entries: Vec<Entry>,
168    /// Providers of regions being written into.
169    providers: HashMap<RegionId, Provider>,
170}
171
172impl<S: LogStore> WalWriter<S> {
173    /// Add a wal entry for specific region to the writer's buffer.
174    pub fn add_entry(
175        &mut self,
176        region_id: RegionId,
177        entry_id: EntryId,
178        wal_entry: &WalEntry,
179        provider: &Provider,
180    ) -> Result<()> {
181        // Gets or inserts with a newly built provider.
182        let provider = self
183            .providers
184            .entry(region_id)
185            .or_insert_with(|| provider.clone());
186
187        let data = wal_entry.encode_to_vec();
188        let entry = self
189            .store
190            .entry(data, entry_id, region_id, provider)
191            .map_err(BoxedError::new)
192            .context(BuildEntrySnafu { region_id })?;
193
194        self.entries.push(entry);
195
196        Ok(())
197    }
198
199    /// Write all buffered entries to the WAL.
200    pub async fn write_to_wal(&mut self) -> Result<AppendBatchResponse> {
201        // TODO(yingwen): metrics.
202
203        let entries = mem::take(&mut self.entries);
204        self.store
205            .append_batch(entries)
206            .await
207            .map_err(BoxedError::new)
208            .context(WriteWalSnafu)
209    }
210}
211
212#[cfg(test)]
213mod tests {
214    use api::v1::helper::{tag_column_schema, time_index_column_schema};
215    use api::v1::{
216        ArrowIpc, BulkWalEntry, ColumnDataType, Mutation, OpType, Row, Rows, Value, bulk_wal_entry,
217        value,
218    };
219    use common_recordbatch::DfRecordBatch;
220    use common_test_util::flight::encode_to_flight_data;
221    use common_test_util::temp_dir::{TempDir, create_temp_dir};
222    use datatypes::arrow;
223    use datatypes::arrow::array::{ArrayRef, TimestampMillisecondArray};
224    use datatypes::arrow::datatypes::Field;
225    use datatypes::arrow_array::StringArray;
226    use futures::TryStreamExt;
227    use log_store::raft_engine::log_store::RaftEngineLogStore;
228    use log_store::test_util::log_store_util;
229    use store_api::storage::SequenceNumber;
230
231    use super::*;
232
233    struct WalEnv {
234        _wal_dir: TempDir,
235        log_store: Option<Arc<RaftEngineLogStore>>,
236    }
237
238    impl WalEnv {
239        async fn new() -> WalEnv {
240            let wal_dir = create_temp_dir("");
241            let log_store =
242                log_store_util::create_tmp_local_file_log_store(wal_dir.path().to_str().unwrap())
243                    .await;
244            WalEnv {
245                _wal_dir: wal_dir,
246                log_store: Some(Arc::new(log_store)),
247            }
248        }
249
250        fn new_wal(&self) -> Wal<RaftEngineLogStore> {
251            let log_store = self.log_store.clone().unwrap();
252            Wal::new(log_store)
253        }
254    }
255
256    /// Create a new mutation from rows.
257    ///
258    /// The row format is (string, i64).
259    fn new_mutation(op_type: OpType, sequence: SequenceNumber, rows: &[(&str, i64)]) -> Mutation {
260        let rows = rows
261            .iter()
262            .map(|(str_col, int_col)| {
263                let values = vec![
264                    Value {
265                        value_data: Some(value::ValueData::StringValue(str_col.to_string())),
266                    },
267                    Value {
268                        value_data: Some(value::ValueData::TimestampMillisecondValue(*int_col)),
269                    },
270                ];
271                Row { values }
272            })
273            .collect();
274        let schema = vec![
275            tag_column_schema("tag", ColumnDataType::String),
276            time_index_column_schema("ts", ColumnDataType::TimestampMillisecond),
277        ];
278
279        Mutation {
280            op_type: op_type as i32,
281            sequence,
282            rows: Some(Rows { schema, rows }),
283            write_hint: None,
284        }
285    }
286
287    #[tokio::test]
288    async fn test_write_wal() {
289        let env = WalEnv::new().await;
290        let wal = env.new_wal();
291
292        let entry = WalEntry {
293            mutations: vec![
294                new_mutation(OpType::Put, 1, &[("k1", 1), ("k2", 2)]),
295                new_mutation(OpType::Put, 2, &[("k3", 3), ("k4", 4)]),
296            ],
297            bulk_entries: vec![],
298        };
299        let mut writer = wal.writer();
300        // Region 1 entry 1.
301        let region_id = RegionId::new(1, 1);
302        writer
303            .add_entry(
304                region_id,
305                1,
306                &entry,
307                &Provider::raft_engine_provider(region_id.as_u64()),
308            )
309            .unwrap();
310        // Region 2 entry 1.
311        let region_id = RegionId::new(1, 2);
312        writer
313            .add_entry(
314                region_id,
315                1,
316                &entry,
317                &Provider::raft_engine_provider(region_id.as_u64()),
318            )
319            .unwrap();
320        // Region 1 entry 2.
321        let region_id = RegionId::new(1, 2);
322        writer
323            .add_entry(
324                region_id,
325                2,
326                &entry,
327                &Provider::raft_engine_provider(region_id.as_u64()),
328            )
329            .unwrap();
330
331        // Test writing multiple region to wal.
332        writer.write_to_wal().await.unwrap();
333    }
334
335    fn build_record_batch(rows: &[(&str, i64)]) -> DfRecordBatch {
336        let schema = Arc::new(arrow::datatypes::Schema::new(vec![
337            Field::new("tag", arrow::datatypes::DataType::Utf8, false),
338            Field::new(
339                "ts",
340                arrow::datatypes::DataType::Timestamp(
341                    arrow::datatypes::TimeUnit::Millisecond,
342                    None,
343                ),
344                false,
345            ),
346        ]));
347
348        let tag = Arc::new(StringArray::from_iter_values(
349            rows.iter().map(|r| r.0.to_string()),
350        )) as ArrayRef;
351        let ts = Arc::new(TimestampMillisecondArray::from_iter_values(
352            rows.iter().map(|r| r.1),
353        )) as ArrayRef;
354        DfRecordBatch::try_new(schema, vec![tag, ts]).unwrap()
355    }
356
357    fn build_bulk_wal_entry(sequence_number: SequenceNumber, rows: &[(&str, i64)]) -> BulkWalEntry {
358        let rb = build_record_batch(rows);
359        let (schema, rb) = encode_to_flight_data(rb);
360        let max_ts = rows.iter().map(|r| r.1).max().unwrap();
361        let min_ts = rows.iter().map(|r| r.1).min().unwrap();
362        BulkWalEntry {
363            sequence: sequence_number,
364            max_ts,
365            min_ts,
366            timestamp_index: 1,
367            body: Some(bulk_wal_entry::Body::ArrowIpc(ArrowIpc {
368                schema: schema.data_header,
369                data_header: rb.data_header,
370                payload: rb.data_body,
371            })),
372        }
373    }
374
375    fn sample_entries() -> Vec<WalEntry> {
376        vec![
377            WalEntry {
378                mutations: vec![
379                    new_mutation(OpType::Put, 1, &[("k1", 1), ("k2", 2)]),
380                    new_mutation(OpType::Put, 2, &[("k3", 3), ("k4", 4)]),
381                ],
382                bulk_entries: vec![],
383            },
384            WalEntry {
385                mutations: vec![new_mutation(OpType::Put, 3, &[("k1", 1), ("k2", 2)])],
386                bulk_entries: vec![],
387            },
388            WalEntry {
389                mutations: vec![
390                    new_mutation(OpType::Put, 4, &[("k1", 1), ("k2", 2)]),
391                    new_mutation(OpType::Put, 5, &[("k3", 3), ("k4", 4)]),
392                ],
393                bulk_entries: vec![],
394            },
395            WalEntry {
396                mutations: vec![new_mutation(OpType::Put, 6, &[("k1", 1), ("k2", 2)])],
397                bulk_entries: vec![build_bulk_wal_entry(7, &[("k1", 8), ("k2", 9)])],
398            },
399        ]
400    }
401
402    fn check_entries(
403        expect: &[WalEntry],
404        expect_start_id: EntryId,
405        actual: &[(EntryId, WalEntry)],
406    ) {
407        for (idx, (expect_entry, (actual_id, actual_entry))) in
408            expect.iter().zip(actual.iter()).enumerate()
409        {
410            let expect_id_entry = (expect_start_id + idx as u64, expect_entry);
411            assert_eq!(expect_id_entry, (*actual_id, actual_entry));
412        }
413        assert_eq!(expect.len(), actual.len());
414    }
415
416    #[tokio::test]
417    async fn test_scan_wal() {
418        let env = WalEnv::new().await;
419        let wal = env.new_wal();
420
421        let entries = sample_entries();
422        let (id1, id2) = (RegionId::new(1, 1), RegionId::new(1, 2));
423        let ns1 = Provider::raft_engine_provider(id1.as_u64());
424        let ns2 = Provider::raft_engine_provider(id2.as_u64());
425        let mut writer = wal.writer();
426        writer.add_entry(id1, 1, &entries[0], &ns1).unwrap();
427        // Insert one entry into region2. Scan should not return this entry.
428        writer.add_entry(id2, 1, &entries[0], &ns2).unwrap();
429        writer.add_entry(id1, 2, &entries[1], &ns1).unwrap();
430        writer.add_entry(id1, 3, &entries[2], &ns1).unwrap();
431        writer.add_entry(id1, 4, &entries[3], &ns1).unwrap();
432
433        writer.write_to_wal().await.unwrap();
434
435        // Scan all contents region1
436        let stream = wal.scan(id1, 1, &ns1).unwrap();
437        let actual: Vec<_> = stream.try_collect().await.unwrap();
438        check_entries(&entries, 1, &actual);
439
440        // Scan parts of contents
441        let stream = wal.scan(id1, 2, &ns1).unwrap();
442        let actual: Vec<_> = stream.try_collect().await.unwrap();
443        check_entries(&entries[1..], 2, &actual);
444
445        // Scan out of range
446        let stream = wal.scan(id1, 5, &ns1).unwrap();
447        let actual: Vec<_> = stream.try_collect().await.unwrap();
448        assert!(actual.is_empty());
449    }
450
451    #[tokio::test]
452    async fn test_obsolete_wal() {
453        let env = WalEnv::new().await;
454        let wal = env.new_wal();
455
456        let entries = sample_entries();
457        let mut writer = wal.writer();
458        let region_id = RegionId::new(1, 1);
459        let ns = Provider::raft_engine_provider(region_id.as_u64());
460        writer.add_entry(region_id, 1, &entries[0], &ns).unwrap();
461        writer.add_entry(region_id, 2, &entries[1], &ns).unwrap();
462        writer.add_entry(region_id, 3, &entries[2], &ns).unwrap();
463
464        writer.write_to_wal().await.unwrap();
465
466        // Delete 1, 2.
467        wal.obsolete(region_id, 2, &ns).await.unwrap();
468
469        // Put 4.
470        let mut writer = wal.writer();
471        writer.add_entry(region_id, 4, &entries[3], &ns).unwrap();
472        writer.write_to_wal().await.unwrap();
473
474        // Scan all
475        let stream = wal.scan(region_id, 1, &ns).unwrap();
476        let actual: Vec<_> = stream.try_collect().await.unwrap();
477        check_entries(&entries[2..], 3, &actual);
478    }
479}