1pub(crate) mod entry_distributor;
18pub(crate) mod entry_reader;
19pub(crate) mod raw_entry_reader;
20
21use std::collections::HashMap;
22use std::mem;
23use std::sync::Arc;
24
25use api::v1::WalEntry;
26use common_error::ext::BoxedError;
27use common_telemetry::debug;
28use entry_reader::NoopEntryReader;
29use futures::future::BoxFuture;
30use futures::stream::BoxStream;
31use prost::Message;
32use snafu::ResultExt;
33use store_api::logstore::entry::Entry;
34use store_api::logstore::provider::Provider;
35use store_api::logstore::{AppendBatchResponse, LogStore, WalIndex};
36use store_api::storage::RegionId;
37
38use crate::error::{BuildEntrySnafu, DeleteWalSnafu, Result, WriteWalSnafu};
39use crate::wal::entry_reader::{LogStoreEntryReader, WalEntryReader};
40use crate::wal::raw_entry_reader::{LogStoreRawEntryReader, RegionRawEntryReader};
41
42pub type EntryId = store_api::logstore::entry::Id;
44pub type WalEntryStream<'a> = BoxStream<'a, Result<(EntryId, WalEntry)>>;
46
47#[derive(Debug)]
51pub struct Wal<S> {
52 store: Arc<S>,
54}
55
56impl<S> Wal<S> {
57 pub fn new(store: Arc<S>) -> Self {
59 Self { store }
60 }
61
62 pub fn store(&self) -> &Arc<S> {
63 &self.store
64 }
65}
66
67impl<S> Clone for Wal<S> {
68 fn clone(&self) -> Self {
69 Self {
70 store: self.store.clone(),
71 }
72 }
73}
74
75impl<S: LogStore> Wal<S> {
76 pub fn writer(&self) -> WalWriter<S> {
78 WalWriter {
79 store: self.store.clone(),
80 entries: Vec::new(),
81 providers: HashMap::new(),
82 }
83 }
84
85 pub(crate) fn on_region_opened(
87 &self,
88 ) -> impl FnOnce(RegionId, EntryId, &Provider) -> BoxFuture<Result<()>> {
89 let store = self.store.clone();
90 move |region_id, last_entry_id, provider| -> BoxFuture<'_, Result<()>> {
91 if let Provider::Noop = provider {
92 debug!("Skip obsolete for region: {}", region_id);
93 return Box::pin(async move { Ok(()) });
94 }
95 Box::pin(async move {
96 store
97 .obsolete(provider, region_id, last_entry_id)
98 .await
99 .map_err(BoxedError::new)
100 .context(DeleteWalSnafu { region_id })
101 })
102 }
103 }
104
105 pub(crate) fn wal_entry_reader(
107 &self,
108 provider: &Provider,
109 region_id: RegionId,
110 location_id: Option<u64>,
111 ) -> Box<dyn WalEntryReader> {
112 match provider {
113 Provider::RaftEngine(_) => Box::new(LogStoreEntryReader::new(
114 LogStoreRawEntryReader::new(self.store.clone()),
115 )),
116 Provider::Kafka(_) => {
117 let reader = if let Some(location_id) = location_id {
118 LogStoreRawEntryReader::new(self.store.clone())
119 .with_wal_index(WalIndex::new(region_id, location_id))
120 } else {
121 LogStoreRawEntryReader::new(self.store.clone())
122 };
123
124 Box::new(LogStoreEntryReader::new(RegionRawEntryReader::new(
125 reader, region_id,
126 )))
127 }
128 Provider::Noop => Box::new(NoopEntryReader),
129 }
130 }
131
132 pub fn scan<'a>(
135 &'a self,
136 region_id: RegionId,
137 start_id: EntryId,
138 provider: &'a Provider,
139 ) -> Result<WalEntryStream<'a>> {
140 match provider {
141 Provider::RaftEngine(_) => {
142 LogStoreEntryReader::new(LogStoreRawEntryReader::new(self.store.clone()))
143 .read(provider, start_id)
144 }
145 Provider::Kafka(_) => LogStoreEntryReader::new(RegionRawEntryReader::new(
146 LogStoreRawEntryReader::new(self.store.clone()),
147 region_id,
148 ))
149 .read(provider, start_id),
150 Provider::Noop => Ok(Box::pin(futures::stream::empty())),
151 }
152 }
153
154 pub async fn obsolete(
156 &self,
157 region_id: RegionId,
158 last_id: EntryId,
159 provider: &Provider,
160 ) -> Result<()> {
161 if let Provider::Noop = provider {
162 return Ok(());
163 }
164 self.store
165 .obsolete(provider, region_id, last_id)
166 .await
167 .map_err(BoxedError::new)
168 .context(DeleteWalSnafu { region_id })
169 }
170}
171
172pub struct WalWriter<S: LogStore> {
174 store: Arc<S>,
176 entries: Vec<Entry>,
178 providers: HashMap<RegionId, Provider>,
180}
181
182impl<S: LogStore> WalWriter<S> {
183 pub fn add_entry(
185 &mut self,
186 region_id: RegionId,
187 entry_id: EntryId,
188 wal_entry: &WalEntry,
189 provider: &Provider,
190 ) -> Result<()> {
191 let provider = self
193 .providers
194 .entry(region_id)
195 .or_insert_with(|| provider.clone());
196
197 let data = wal_entry.encode_to_vec();
198 let entry = self
199 .store
200 .entry(data, entry_id, region_id, provider)
201 .map_err(BoxedError::new)
202 .context(BuildEntrySnafu { region_id })?;
203
204 self.entries.push(entry);
205
206 Ok(())
207 }
208
209 pub async fn write_to_wal(&mut self) -> Result<AppendBatchResponse> {
211 let entries = mem::take(&mut self.entries);
214 self.store
215 .append_batch(entries)
216 .await
217 .map_err(BoxedError::new)
218 .context(WriteWalSnafu)
219 }
220}
221
222#[cfg(test)]
223mod tests {
224 use api::v1::helper::{tag_column_schema, time_index_column_schema};
225 use api::v1::{
226 bulk_wal_entry, value, ArrowIpc, BulkWalEntry, ColumnDataType, Mutation, OpType, Row, Rows,
227 Value,
228 };
229 use common_recordbatch::DfRecordBatch;
230 use common_test_util::flight::encode_to_flight_data;
231 use common_test_util::temp_dir::{create_temp_dir, TempDir};
232 use datatypes::arrow;
233 use datatypes::arrow::array::{ArrayRef, TimestampMillisecondArray};
234 use datatypes::arrow::datatypes::Field;
235 use datatypes::arrow_array::StringArray;
236 use futures::TryStreamExt;
237 use log_store::raft_engine::log_store::RaftEngineLogStore;
238 use log_store::test_util::log_store_util;
239 use store_api::storage::SequenceNumber;
240
241 use super::*;
242
243 struct WalEnv {
244 _wal_dir: TempDir,
245 log_store: Option<Arc<RaftEngineLogStore>>,
246 }
247
248 impl WalEnv {
249 async fn new() -> WalEnv {
250 let wal_dir = create_temp_dir("");
251 let log_store =
252 log_store_util::create_tmp_local_file_log_store(wal_dir.path().to_str().unwrap())
253 .await;
254 WalEnv {
255 _wal_dir: wal_dir,
256 log_store: Some(Arc::new(log_store)),
257 }
258 }
259
260 fn new_wal(&self) -> Wal<RaftEngineLogStore> {
261 let log_store = self.log_store.clone().unwrap();
262 Wal::new(log_store)
263 }
264 }
265
266 fn new_mutation(op_type: OpType, sequence: SequenceNumber, rows: &[(&str, i64)]) -> Mutation {
270 let rows = rows
271 .iter()
272 .map(|(str_col, int_col)| {
273 let values = vec![
274 Value {
275 value_data: Some(value::ValueData::StringValue(str_col.to_string())),
276 },
277 Value {
278 value_data: Some(value::ValueData::TimestampMillisecondValue(*int_col)),
279 },
280 ];
281 Row { values }
282 })
283 .collect();
284 let schema = vec![
285 tag_column_schema("tag", ColumnDataType::String),
286 time_index_column_schema("ts", ColumnDataType::TimestampMillisecond),
287 ];
288
289 Mutation {
290 op_type: op_type as i32,
291 sequence,
292 rows: Some(Rows { schema, rows }),
293 write_hint: None,
294 }
295 }
296
297 #[tokio::test]
298 async fn test_write_wal() {
299 let env = WalEnv::new().await;
300 let wal = env.new_wal();
301
302 let entry = WalEntry {
303 mutations: vec![
304 new_mutation(OpType::Put, 1, &[("k1", 1), ("k2", 2)]),
305 new_mutation(OpType::Put, 2, &[("k3", 3), ("k4", 4)]),
306 ],
307 bulk_entries: vec![],
308 };
309 let mut writer = wal.writer();
310 let region_id = RegionId::new(1, 1);
312 writer
313 .add_entry(
314 region_id,
315 1,
316 &entry,
317 &Provider::raft_engine_provider(region_id.as_u64()),
318 )
319 .unwrap();
320 let region_id = RegionId::new(1, 2);
322 writer
323 .add_entry(
324 region_id,
325 1,
326 &entry,
327 &Provider::raft_engine_provider(region_id.as_u64()),
328 )
329 .unwrap();
330 let region_id = RegionId::new(1, 2);
332 writer
333 .add_entry(
334 region_id,
335 2,
336 &entry,
337 &Provider::raft_engine_provider(region_id.as_u64()),
338 )
339 .unwrap();
340
341 writer.write_to_wal().await.unwrap();
343 }
344
345 fn build_record_batch(rows: &[(&str, i64)]) -> DfRecordBatch {
346 let schema = Arc::new(arrow::datatypes::Schema::new(vec![
347 Field::new("tag", arrow::datatypes::DataType::Utf8, false),
348 Field::new(
349 "ts",
350 arrow::datatypes::DataType::Timestamp(
351 arrow::datatypes::TimeUnit::Millisecond,
352 None,
353 ),
354 false,
355 ),
356 ]));
357
358 let tag = Arc::new(StringArray::from_iter_values(
359 rows.iter().map(|r| r.0.to_string()),
360 )) as ArrayRef;
361 let ts = Arc::new(TimestampMillisecondArray::from_iter_values(
362 rows.iter().map(|r| r.1),
363 )) as ArrayRef;
364 DfRecordBatch::try_new(schema, vec![tag, ts]).unwrap()
365 }
366
367 fn build_bulk_wal_entry(sequence_number: SequenceNumber, rows: &[(&str, i64)]) -> BulkWalEntry {
368 let rb = build_record_batch(rows);
369 let (schema, rb) = encode_to_flight_data(rb);
370 let max_ts = rows.iter().map(|r| r.1).max().unwrap();
371 let min_ts = rows.iter().map(|r| r.1).min().unwrap();
372 BulkWalEntry {
373 sequence: sequence_number,
374 max_ts,
375 min_ts,
376 timestamp_index: 1,
377 body: Some(bulk_wal_entry::Body::ArrowIpc(ArrowIpc {
378 schema: schema.data_header,
379 data_header: rb.data_header,
380 payload: rb.data_body,
381 })),
382 }
383 }
384
385 fn sample_entries() -> Vec<WalEntry> {
386 vec![
387 WalEntry {
388 mutations: vec![
389 new_mutation(OpType::Put, 1, &[("k1", 1), ("k2", 2)]),
390 new_mutation(OpType::Put, 2, &[("k3", 3), ("k4", 4)]),
391 ],
392 bulk_entries: vec![],
393 },
394 WalEntry {
395 mutations: vec![new_mutation(OpType::Put, 3, &[("k1", 1), ("k2", 2)])],
396 bulk_entries: vec![],
397 },
398 WalEntry {
399 mutations: vec![
400 new_mutation(OpType::Put, 4, &[("k1", 1), ("k2", 2)]),
401 new_mutation(OpType::Put, 5, &[("k3", 3), ("k4", 4)]),
402 ],
403 bulk_entries: vec![],
404 },
405 WalEntry {
406 mutations: vec![new_mutation(OpType::Put, 6, &[("k1", 1), ("k2", 2)])],
407 bulk_entries: vec![build_bulk_wal_entry(7, &[("k1", 8), ("k2", 9)])],
408 },
409 ]
410 }
411
412 fn check_entries(
413 expect: &[WalEntry],
414 expect_start_id: EntryId,
415 actual: &[(EntryId, WalEntry)],
416 ) {
417 for (idx, (expect_entry, (actual_id, actual_entry))) in
418 expect.iter().zip(actual.iter()).enumerate()
419 {
420 let expect_id_entry = (expect_start_id + idx as u64, expect_entry);
421 assert_eq!(expect_id_entry, (*actual_id, actual_entry));
422 }
423 assert_eq!(expect.len(), actual.len());
424 }
425
426 #[tokio::test]
427 async fn test_scan_wal() {
428 let env = WalEnv::new().await;
429 let wal = env.new_wal();
430
431 let entries = sample_entries();
432 let (id1, id2) = (RegionId::new(1, 1), RegionId::new(1, 2));
433 let ns1 = Provider::raft_engine_provider(id1.as_u64());
434 let ns2 = Provider::raft_engine_provider(id2.as_u64());
435 let mut writer = wal.writer();
436 writer.add_entry(id1, 1, &entries[0], &ns1).unwrap();
437 writer.add_entry(id2, 1, &entries[0], &ns2).unwrap();
439 writer.add_entry(id1, 2, &entries[1], &ns1).unwrap();
440 writer.add_entry(id1, 3, &entries[2], &ns1).unwrap();
441 writer.add_entry(id1, 4, &entries[3], &ns1).unwrap();
442
443 writer.write_to_wal().await.unwrap();
444
445 let stream = wal.scan(id1, 1, &ns1).unwrap();
447 let actual: Vec<_> = stream.try_collect().await.unwrap();
448 check_entries(&entries, 1, &actual);
449
450 let stream = wal.scan(id1, 2, &ns1).unwrap();
452 let actual: Vec<_> = stream.try_collect().await.unwrap();
453 check_entries(&entries[1..], 2, &actual);
454
455 let stream = wal.scan(id1, 5, &ns1).unwrap();
457 let actual: Vec<_> = stream.try_collect().await.unwrap();
458 assert!(actual.is_empty());
459 }
460
461 #[tokio::test]
462 async fn test_obsolete_wal() {
463 let env = WalEnv::new().await;
464 let wal = env.new_wal();
465
466 let entries = sample_entries();
467 let mut writer = wal.writer();
468 let region_id = RegionId::new(1, 1);
469 let ns = Provider::raft_engine_provider(region_id.as_u64());
470 writer.add_entry(region_id, 1, &entries[0], &ns).unwrap();
471 writer.add_entry(region_id, 2, &entries[1], &ns).unwrap();
472 writer.add_entry(region_id, 3, &entries[2], &ns).unwrap();
473
474 writer.write_to_wal().await.unwrap();
475
476 wal.obsolete(region_id, 2, &ns).await.unwrap();
478
479 let mut writer = wal.writer();
481 writer.add_entry(region_id, 4, &entries[3], &ns).unwrap();
482 writer.write_to_wal().await.unwrap();
483
484 let stream = wal.scan(region_id, 1, &ns).unwrap();
486 let actual: Vec<_> = stream.try_collect().await.unwrap();
487 check_entries(&entries[2..], 3, &actual);
488 }
489}