1pub(crate) mod entry_distributor;
18pub(crate) mod entry_reader;
19pub(crate) mod raw_entry_reader;
20
21use std::collections::HashMap;
22use std::mem;
23use std::sync::Arc;
24
25use api::v1::WalEntry;
26use common_error::ext::BoxedError;
27use common_telemetry::debug;
28use entry_reader::NoopEntryReader;
29use futures::future::BoxFuture;
30use futures::stream::BoxStream;
31use prost::Message;
32use snafu::ResultExt;
33use store_api::logstore::entry::Entry;
34use store_api::logstore::provider::Provider;
35use store_api::logstore::{AppendBatchResponse, LogStore, WalIndex};
36use store_api::storage::RegionId;
37
38use crate::error::{BuildEntrySnafu, DeleteWalSnafu, EncodeWalSnafu, Result, WriteWalSnafu};
39use crate::wal::entry_reader::{LogStoreEntryReader, WalEntryReader};
40use crate::wal::raw_entry_reader::{LogStoreRawEntryReader, RegionRawEntryReader};
41
42pub type EntryId = store_api::logstore::entry::Id;
44pub type WalEntryStream<'a> = BoxStream<'a, Result<(EntryId, WalEntry)>>;
46
47#[derive(Debug)]
51pub struct Wal<S> {
52 store: Arc<S>,
54}
55
56impl<S> Wal<S> {
57 pub fn new(store: Arc<S>) -> Self {
59 Self { store }
60 }
61
62 pub fn store(&self) -> &Arc<S> {
63 &self.store
64 }
65}
66
67impl<S> Clone for Wal<S> {
68 fn clone(&self) -> Self {
69 Self {
70 store: self.store.clone(),
71 }
72 }
73}
74
75impl<S: LogStore> Wal<S> {
76 pub fn writer(&self) -> WalWriter<S> {
78 WalWriter {
79 store: self.store.clone(),
80 entries: Vec::new(),
81 entry_encode_buf: Vec::new(),
82 providers: HashMap::new(),
83 }
84 }
85
86 pub(crate) fn on_region_opened(
88 &self,
89 ) -> impl FnOnce(RegionId, EntryId, &Provider) -> BoxFuture<Result<()>> {
90 let store = self.store.clone();
91 move |region_id, last_entry_id, provider| -> BoxFuture<'_, Result<()>> {
92 if let Provider::Noop = provider {
93 debug!("Skip obsolete for region: {}", region_id);
94 return Box::pin(async move { Ok(()) });
95 }
96 Box::pin(async move {
97 store
98 .obsolete(provider, region_id, last_entry_id)
99 .await
100 .map_err(BoxedError::new)
101 .context(DeleteWalSnafu { region_id })
102 })
103 }
104 }
105
106 pub(crate) fn wal_entry_reader(
108 &self,
109 provider: &Provider,
110 region_id: RegionId,
111 location_id: Option<u64>,
112 ) -> Box<dyn WalEntryReader> {
113 match provider {
114 Provider::RaftEngine(_) => Box::new(LogStoreEntryReader::new(
115 LogStoreRawEntryReader::new(self.store.clone()),
116 )),
117 Provider::Kafka(_) => {
118 let reader = if let Some(location_id) = location_id {
119 LogStoreRawEntryReader::new(self.store.clone())
120 .with_wal_index(WalIndex::new(region_id, location_id))
121 } else {
122 LogStoreRawEntryReader::new(self.store.clone())
123 };
124
125 Box::new(LogStoreEntryReader::new(RegionRawEntryReader::new(
126 reader, region_id,
127 )))
128 }
129 Provider::Noop => Box::new(NoopEntryReader),
130 }
131 }
132
133 pub fn scan<'a>(
136 &'a self,
137 region_id: RegionId,
138 start_id: EntryId,
139 provider: &'a Provider,
140 ) -> Result<WalEntryStream<'a>> {
141 match provider {
142 Provider::RaftEngine(_) => {
143 LogStoreEntryReader::new(LogStoreRawEntryReader::new(self.store.clone()))
144 .read(provider, start_id)
145 }
146 Provider::Kafka(_) => LogStoreEntryReader::new(RegionRawEntryReader::new(
147 LogStoreRawEntryReader::new(self.store.clone()),
148 region_id,
149 ))
150 .read(provider, start_id),
151 Provider::Noop => Ok(Box::pin(futures::stream::empty())),
152 }
153 }
154
155 pub async fn obsolete(
157 &self,
158 region_id: RegionId,
159 last_id: EntryId,
160 provider: &Provider,
161 ) -> Result<()> {
162 if let Provider::Noop = provider {
163 return Ok(());
164 }
165 self.store
166 .obsolete(provider, region_id, last_id)
167 .await
168 .map_err(BoxedError::new)
169 .context(DeleteWalSnafu { region_id })
170 }
171}
172
173pub struct WalWriter<S: LogStore> {
175 store: Arc<S>,
177 entries: Vec<Entry>,
179 entry_encode_buf: Vec<u8>,
181 providers: HashMap<RegionId, Provider>,
183}
184
185impl<S: LogStore> WalWriter<S> {
186 pub fn add_entry(
188 &mut self,
189 region_id: RegionId,
190 entry_id: EntryId,
191 wal_entry: &WalEntry,
192 provider: &Provider,
193 ) -> Result<()> {
194 let provider = self
196 .providers
197 .entry(region_id)
198 .or_insert_with(|| provider.clone());
199
200 self.entry_encode_buf.clear();
202 wal_entry
203 .encode(&mut self.entry_encode_buf)
204 .context(EncodeWalSnafu { region_id })?;
205 let entry = self
206 .store
207 .entry(&mut self.entry_encode_buf, entry_id, region_id, provider)
208 .map_err(BoxedError::new)
209 .context(BuildEntrySnafu { region_id })?;
210
211 self.entries.push(entry);
212
213 Ok(())
214 }
215
216 pub async fn write_to_wal(&mut self) -> Result<AppendBatchResponse> {
218 let entries = mem::take(&mut self.entries);
221 self.store
222 .append_batch(entries)
223 .await
224 .map_err(BoxedError::new)
225 .context(WriteWalSnafu)
226 }
227}
228
229#[cfg(test)]
230mod tests {
231 use api::v1::{
232 value, ColumnDataType, ColumnSchema, Mutation, OpType, Row, Rows, SemanticType, Value,
233 };
234 use common_test_util::temp_dir::{create_temp_dir, TempDir};
235 use futures::TryStreamExt;
236 use log_store::raft_engine::log_store::RaftEngineLogStore;
237 use log_store::test_util::log_store_util;
238 use store_api::storage::SequenceNumber;
239
240 use super::*;
241
242 struct WalEnv {
243 _wal_dir: TempDir,
244 log_store: Option<Arc<RaftEngineLogStore>>,
245 }
246
247 impl WalEnv {
248 async fn new() -> WalEnv {
249 let wal_dir = create_temp_dir("");
250 let log_store =
251 log_store_util::create_tmp_local_file_log_store(wal_dir.path().to_str().unwrap())
252 .await;
253 WalEnv {
254 _wal_dir: wal_dir,
255 log_store: Some(Arc::new(log_store)),
256 }
257 }
258
259 fn new_wal(&self) -> Wal<RaftEngineLogStore> {
260 let log_store = self.log_store.clone().unwrap();
261 Wal::new(log_store)
262 }
263 }
264
265 fn new_mutation(op_type: OpType, sequence: SequenceNumber, rows: &[(&str, i64)]) -> Mutation {
269 let rows = rows
270 .iter()
271 .map(|(str_col, int_col)| {
272 let values = vec![
273 Value {
274 value_data: Some(value::ValueData::StringValue(str_col.to_string())),
275 },
276 Value {
277 value_data: Some(value::ValueData::TimestampMillisecondValue(*int_col)),
278 },
279 ];
280 Row { values }
281 })
282 .collect();
283 let schema = vec![
284 ColumnSchema {
285 column_name: "tag".to_string(),
286 datatype: ColumnDataType::String as i32,
287 semantic_type: SemanticType::Tag as i32,
288 ..Default::default()
289 },
290 ColumnSchema {
291 column_name: "ts".to_string(),
292 datatype: ColumnDataType::TimestampMillisecond as i32,
293 semantic_type: SemanticType::Timestamp as i32,
294 ..Default::default()
295 },
296 ];
297
298 Mutation {
299 op_type: op_type as i32,
300 sequence,
301 rows: Some(Rows { schema, rows }),
302 write_hint: None,
303 }
304 }
305
306 #[tokio::test]
307 async fn test_write_wal() {
308 let env = WalEnv::new().await;
309 let wal = env.new_wal();
310
311 let entry = WalEntry {
312 mutations: vec![
313 new_mutation(OpType::Put, 1, &[("k1", 1), ("k2", 2)]),
314 new_mutation(OpType::Put, 2, &[("k3", 3), ("k4", 4)]),
315 ],
316 };
317 let mut writer = wal.writer();
318 let region_id = RegionId::new(1, 1);
320 writer
321 .add_entry(
322 region_id,
323 1,
324 &entry,
325 &Provider::raft_engine_provider(region_id.as_u64()),
326 )
327 .unwrap();
328 let region_id = RegionId::new(1, 2);
330 writer
331 .add_entry(
332 region_id,
333 1,
334 &entry,
335 &Provider::raft_engine_provider(region_id.as_u64()),
336 )
337 .unwrap();
338 let region_id = RegionId::new(1, 2);
340 writer
341 .add_entry(
342 region_id,
343 2,
344 &entry,
345 &Provider::raft_engine_provider(region_id.as_u64()),
346 )
347 .unwrap();
348
349 writer.write_to_wal().await.unwrap();
351 }
352
353 fn sample_entries() -> Vec<WalEntry> {
354 vec![
355 WalEntry {
356 mutations: vec![
357 new_mutation(OpType::Put, 1, &[("k1", 1), ("k2", 2)]),
358 new_mutation(OpType::Put, 2, &[("k3", 3), ("k4", 4)]),
359 ],
360 },
361 WalEntry {
362 mutations: vec![new_mutation(OpType::Put, 3, &[("k1", 1), ("k2", 2)])],
363 },
364 WalEntry {
365 mutations: vec![
366 new_mutation(OpType::Put, 4, &[("k1", 1), ("k2", 2)]),
367 new_mutation(OpType::Put, 5, &[("k3", 3), ("k4", 4)]),
368 ],
369 },
370 WalEntry {
371 mutations: vec![new_mutation(OpType::Put, 6, &[("k1", 1), ("k2", 2)])],
372 },
373 ]
374 }
375
376 fn check_entries(
377 expect: &[WalEntry],
378 expect_start_id: EntryId,
379 actual: &[(EntryId, WalEntry)],
380 ) {
381 for (idx, (expect_entry, (actual_id, actual_entry))) in
382 expect.iter().zip(actual.iter()).enumerate()
383 {
384 let expect_id_entry = (expect_start_id + idx as u64, expect_entry);
385 assert_eq!(expect_id_entry, (*actual_id, actual_entry));
386 }
387 assert_eq!(expect.len(), actual.len());
388 }
389
390 #[tokio::test]
391 async fn test_scan_wal() {
392 let env = WalEnv::new().await;
393 let wal = env.new_wal();
394
395 let entries = sample_entries();
396 let (id1, id2) = (RegionId::new(1, 1), RegionId::new(1, 2));
397 let ns1 = Provider::raft_engine_provider(id1.as_u64());
398 let ns2 = Provider::raft_engine_provider(id2.as_u64());
399 let mut writer = wal.writer();
400 writer.add_entry(id1, 1, &entries[0], &ns1).unwrap();
401 writer.add_entry(id2, 1, &entries[0], &ns2).unwrap();
403 writer.add_entry(id1, 2, &entries[1], &ns1).unwrap();
404 writer.add_entry(id1, 3, &entries[2], &ns1).unwrap();
405 writer.add_entry(id1, 4, &entries[3], &ns1).unwrap();
406
407 writer.write_to_wal().await.unwrap();
408
409 let stream = wal.scan(id1, 1, &ns1).unwrap();
411 let actual: Vec<_> = stream.try_collect().await.unwrap();
412 check_entries(&entries, 1, &actual);
413
414 let stream = wal.scan(id1, 2, &ns1).unwrap();
416 let actual: Vec<_> = stream.try_collect().await.unwrap();
417 check_entries(&entries[1..], 2, &actual);
418
419 let stream = wal.scan(id1, 5, &ns1).unwrap();
421 let actual: Vec<_> = stream.try_collect().await.unwrap();
422 assert!(actual.is_empty());
423 }
424
425 #[tokio::test]
426 async fn test_obsolete_wal() {
427 let env = WalEnv::new().await;
428 let wal = env.new_wal();
429
430 let entries = sample_entries();
431 let mut writer = wal.writer();
432 let region_id = RegionId::new(1, 1);
433 let ns = Provider::raft_engine_provider(region_id.as_u64());
434 writer.add_entry(region_id, 1, &entries[0], &ns).unwrap();
435 writer.add_entry(region_id, 2, &entries[1], &ns).unwrap();
436 writer.add_entry(region_id, 3, &entries[2], &ns).unwrap();
437
438 writer.write_to_wal().await.unwrap();
439
440 wal.obsolete(region_id, 2, &ns).await.unwrap();
442
443 let mut writer = wal.writer();
445 writer.add_entry(region_id, 4, &entries[3], &ns).unwrap();
446 writer.write_to_wal().await.unwrap();
447
448 let stream = wal.scan(region_id, 1, &ns).unwrap();
450 let actual: Vec<_> = stream.try_collect().await.unwrap();
451 check_entries(&entries[2..], 3, &actual);
452 }
453}