1pub(crate) mod entry_distributor;
18pub(crate) mod entry_reader;
19pub(crate) mod raw_entry_reader;
20
21use std::collections::HashMap;
22use std::mem;
23use std::sync::Arc;
24
25use api::v1::WalEntry;
26use common_error::ext::BoxedError;
27use common_telemetry::debug;
28use entry_reader::NoopEntryReader;
29use futures::future::BoxFuture;
30use futures::stream::BoxStream;
31use prost::Message;
32use snafu::ResultExt;
33use store_api::logstore::entry::Entry;
34use store_api::logstore::provider::Provider;
35use store_api::logstore::{AppendBatchResponse, LogStore, WalIndex};
36use store_api::storage::RegionId;
37
38use crate::error::{BuildEntrySnafu, DeleteWalSnafu, Result, WriteWalSnafu};
39use crate::wal::entry_reader::{LogStoreEntryReader, WalEntryReader};
40use crate::wal::raw_entry_reader::{LogStoreRawEntryReader, RegionRawEntryReader};
41
42pub type EntryId = store_api::logstore::entry::Id;
44pub type WalEntryStream<'a> = BoxStream<'a, Result<(EntryId, WalEntry)>>;
46
47#[derive(Debug)]
51pub struct Wal<S> {
52 store: Arc<S>,
54}
55
56impl<S> Wal<S> {
57 pub fn new(store: Arc<S>) -> Self {
59 Self { store }
60 }
61
62 pub fn store(&self) -> &Arc<S> {
63 &self.store
64 }
65}
66
67impl<S> Clone for Wal<S> {
68 fn clone(&self) -> Self {
69 Self {
70 store: Arc::clone(&self.store),
71 }
72 }
73}
74
75impl<S: LogStore> Wal<S> {
76 pub fn writer(&self) -> WalWriter<S> {
78 WalWriter {
79 store: self.store.clone(),
80 entries: Vec::new(),
81 providers: HashMap::new(),
82 }
83 }
84
85 pub(crate) fn on_region_opened(
87 &self,
88 ) -> impl FnOnce(RegionId, EntryId, &Provider) -> BoxFuture<Result<()>> {
89 let store = self.store.clone();
90 move |region_id, last_entry_id, provider| -> BoxFuture<'_, Result<()>> {
91 if let Provider::Noop = provider {
92 debug!("Skip obsolete for region: {}", region_id);
93 return Box::pin(async move { Ok(()) });
94 }
95 Box::pin(async move {
96 store
97 .obsolete(provider, region_id, last_entry_id)
98 .await
99 .map_err(BoxedError::new)
100 .context(DeleteWalSnafu { region_id })
101 })
102 }
103 }
104
105 pub(crate) fn wal_entry_reader(
107 &self,
108 provider: &Provider,
109 region_id: RegionId,
110 location_id: Option<u64>,
111 ) -> Box<dyn WalEntryReader> {
112 match provider {
113 Provider::RaftEngine(_) => Box::new(LogStoreEntryReader::new(
114 LogStoreRawEntryReader::new(self.store.clone()),
115 )),
116 Provider::Kafka(_) => {
117 let reader = if let Some(location_id) = location_id {
118 LogStoreRawEntryReader::new(self.store.clone())
119 .with_wal_index(WalIndex::new(region_id, location_id))
120 } else {
121 LogStoreRawEntryReader::new(self.store.clone())
122 };
123
124 Box::new(LogStoreEntryReader::new(RegionRawEntryReader::new(
125 reader, region_id,
126 )))
127 }
128 Provider::Noop => Box::new(NoopEntryReader),
129 }
130 }
131
132 pub fn scan<'a>(
135 &'a self,
136 region_id: RegionId,
137 start_id: EntryId,
138 provider: &'a Provider,
139 ) -> Result<WalEntryStream<'a>> {
140 let mut reader = self.wal_entry_reader(provider, region_id, None);
141 reader.read(provider, start_id)
142 }
143
144 pub async fn obsolete(
146 &self,
147 region_id: RegionId,
148 last_id: EntryId,
149 provider: &Provider,
150 ) -> Result<()> {
151 if let Provider::Noop = provider {
152 return Ok(());
153 }
154 self.store
155 .obsolete(provider, region_id, last_id)
156 .await
157 .map_err(BoxedError::new)
158 .context(DeleteWalSnafu { region_id })
159 }
160}
161
162pub struct WalWriter<S: LogStore> {
164 store: Arc<S>,
166 entries: Vec<Entry>,
168 providers: HashMap<RegionId, Provider>,
170}
171
172impl<S: LogStore> WalWriter<S> {
173 pub fn add_entry(
175 &mut self,
176 region_id: RegionId,
177 entry_id: EntryId,
178 wal_entry: &WalEntry,
179 provider: &Provider,
180 ) -> Result<()> {
181 let provider = self
183 .providers
184 .entry(region_id)
185 .or_insert_with(|| provider.clone());
186
187 let data = wal_entry.encode_to_vec();
188 let entry = self
189 .store
190 .entry(data, entry_id, region_id, provider)
191 .map_err(BoxedError::new)
192 .context(BuildEntrySnafu { region_id })?;
193
194 self.entries.push(entry);
195
196 Ok(())
197 }
198
199 pub async fn write_to_wal(&mut self) -> Result<AppendBatchResponse> {
201 let entries = mem::take(&mut self.entries);
204 self.store
205 .append_batch(entries)
206 .await
207 .map_err(BoxedError::new)
208 .context(WriteWalSnafu)
209 }
210}
211
212#[cfg(test)]
213mod tests {
214 use api::v1::helper::{tag_column_schema, time_index_column_schema};
215 use api::v1::{
216 ArrowIpc, BulkWalEntry, ColumnDataType, Mutation, OpType, Row, Rows, Value, bulk_wal_entry,
217 value,
218 };
219 use common_recordbatch::DfRecordBatch;
220 use common_test_util::flight::encode_to_flight_data;
221 use common_test_util::temp_dir::{TempDir, create_temp_dir};
222 use datatypes::arrow;
223 use datatypes::arrow::array::{ArrayRef, TimestampMillisecondArray};
224 use datatypes::arrow::datatypes::Field;
225 use datatypes::arrow_array::StringArray;
226 use futures::TryStreamExt;
227 use log_store::raft_engine::log_store::RaftEngineLogStore;
228 use log_store::test_util::log_store_util;
229 use store_api::storage::SequenceNumber;
230
231 use super::*;
232
233 struct WalEnv {
234 _wal_dir: TempDir,
235 log_store: Option<Arc<RaftEngineLogStore>>,
236 }
237
238 impl WalEnv {
239 async fn new() -> WalEnv {
240 let wal_dir = create_temp_dir("");
241 let log_store =
242 log_store_util::create_tmp_local_file_log_store(wal_dir.path().to_str().unwrap())
243 .await;
244 WalEnv {
245 _wal_dir: wal_dir,
246 log_store: Some(Arc::new(log_store)),
247 }
248 }
249
250 fn new_wal(&self) -> Wal<RaftEngineLogStore> {
251 let log_store = self.log_store.clone().unwrap();
252 Wal::new(log_store)
253 }
254 }
255
256 fn new_mutation(op_type: OpType, sequence: SequenceNumber, rows: &[(&str, i64)]) -> Mutation {
260 let rows = rows
261 .iter()
262 .map(|(str_col, int_col)| {
263 let values = vec![
264 Value {
265 value_data: Some(value::ValueData::StringValue(str_col.to_string())),
266 },
267 Value {
268 value_data: Some(value::ValueData::TimestampMillisecondValue(*int_col)),
269 },
270 ];
271 Row { values }
272 })
273 .collect();
274 let schema = vec![
275 tag_column_schema("tag", ColumnDataType::String),
276 time_index_column_schema("ts", ColumnDataType::TimestampMillisecond),
277 ];
278
279 Mutation {
280 op_type: op_type as i32,
281 sequence,
282 rows: Some(Rows { schema, rows }),
283 write_hint: None,
284 }
285 }
286
287 #[tokio::test]
288 async fn test_write_wal() {
289 let env = WalEnv::new().await;
290 let wal = env.new_wal();
291
292 let entry = WalEntry {
293 mutations: vec![
294 new_mutation(OpType::Put, 1, &[("k1", 1), ("k2", 2)]),
295 new_mutation(OpType::Put, 2, &[("k3", 3), ("k4", 4)]),
296 ],
297 bulk_entries: vec![],
298 };
299 let mut writer = wal.writer();
300 let region_id = RegionId::new(1, 1);
302 writer
303 .add_entry(
304 region_id,
305 1,
306 &entry,
307 &Provider::raft_engine_provider(region_id.as_u64()),
308 )
309 .unwrap();
310 let region_id = RegionId::new(1, 2);
312 writer
313 .add_entry(
314 region_id,
315 1,
316 &entry,
317 &Provider::raft_engine_provider(region_id.as_u64()),
318 )
319 .unwrap();
320 let region_id = RegionId::new(1, 2);
322 writer
323 .add_entry(
324 region_id,
325 2,
326 &entry,
327 &Provider::raft_engine_provider(region_id.as_u64()),
328 )
329 .unwrap();
330
331 writer.write_to_wal().await.unwrap();
333 }
334
335 fn build_record_batch(rows: &[(&str, i64)]) -> DfRecordBatch {
336 let schema = Arc::new(arrow::datatypes::Schema::new(vec![
337 Field::new("tag", arrow::datatypes::DataType::Utf8, false),
338 Field::new(
339 "ts",
340 arrow::datatypes::DataType::Timestamp(
341 arrow::datatypes::TimeUnit::Millisecond,
342 None,
343 ),
344 false,
345 ),
346 ]));
347
348 let tag = Arc::new(StringArray::from_iter_values(
349 rows.iter().map(|r| r.0.to_string()),
350 )) as ArrayRef;
351 let ts = Arc::new(TimestampMillisecondArray::from_iter_values(
352 rows.iter().map(|r| r.1),
353 )) as ArrayRef;
354 DfRecordBatch::try_new(schema, vec![tag, ts]).unwrap()
355 }
356
357 fn build_bulk_wal_entry(sequence_number: SequenceNumber, rows: &[(&str, i64)]) -> BulkWalEntry {
358 let rb = build_record_batch(rows);
359 let (schema, rb) = encode_to_flight_data(rb);
360 let max_ts = rows.iter().map(|r| r.1).max().unwrap();
361 let min_ts = rows.iter().map(|r| r.1).min().unwrap();
362 BulkWalEntry {
363 sequence: sequence_number,
364 max_ts,
365 min_ts,
366 timestamp_index: 1,
367 body: Some(bulk_wal_entry::Body::ArrowIpc(ArrowIpc {
368 schema: schema.data_header,
369 data_header: rb.data_header,
370 payload: rb.data_body,
371 })),
372 }
373 }
374
375 fn sample_entries() -> Vec<WalEntry> {
376 vec![
377 WalEntry {
378 mutations: vec![
379 new_mutation(OpType::Put, 1, &[("k1", 1), ("k2", 2)]),
380 new_mutation(OpType::Put, 2, &[("k3", 3), ("k4", 4)]),
381 ],
382 bulk_entries: vec![],
383 },
384 WalEntry {
385 mutations: vec![new_mutation(OpType::Put, 3, &[("k1", 1), ("k2", 2)])],
386 bulk_entries: vec![],
387 },
388 WalEntry {
389 mutations: vec![
390 new_mutation(OpType::Put, 4, &[("k1", 1), ("k2", 2)]),
391 new_mutation(OpType::Put, 5, &[("k3", 3), ("k4", 4)]),
392 ],
393 bulk_entries: vec![],
394 },
395 WalEntry {
396 mutations: vec![new_mutation(OpType::Put, 6, &[("k1", 1), ("k2", 2)])],
397 bulk_entries: vec![build_bulk_wal_entry(7, &[("k1", 8), ("k2", 9)])],
398 },
399 ]
400 }
401
402 fn check_entries(
403 expect: &[WalEntry],
404 expect_start_id: EntryId,
405 actual: &[(EntryId, WalEntry)],
406 ) {
407 for (idx, (expect_entry, (actual_id, actual_entry))) in
408 expect.iter().zip(actual.iter()).enumerate()
409 {
410 let expect_id_entry = (expect_start_id + idx as u64, expect_entry);
411 assert_eq!(expect_id_entry, (*actual_id, actual_entry));
412 }
413 assert_eq!(expect.len(), actual.len());
414 }
415
416 #[tokio::test]
417 async fn test_scan_wal() {
418 let env = WalEnv::new().await;
419 let wal = env.new_wal();
420
421 let entries = sample_entries();
422 let (id1, id2) = (RegionId::new(1, 1), RegionId::new(1, 2));
423 let ns1 = Provider::raft_engine_provider(id1.as_u64());
424 let ns2 = Provider::raft_engine_provider(id2.as_u64());
425 let mut writer = wal.writer();
426 writer.add_entry(id1, 1, &entries[0], &ns1).unwrap();
427 writer.add_entry(id2, 1, &entries[0], &ns2).unwrap();
429 writer.add_entry(id1, 2, &entries[1], &ns1).unwrap();
430 writer.add_entry(id1, 3, &entries[2], &ns1).unwrap();
431 writer.add_entry(id1, 4, &entries[3], &ns1).unwrap();
432
433 writer.write_to_wal().await.unwrap();
434
435 let stream = wal.scan(id1, 1, &ns1).unwrap();
437 let actual: Vec<_> = stream.try_collect().await.unwrap();
438 check_entries(&entries, 1, &actual);
439
440 let stream = wal.scan(id1, 2, &ns1).unwrap();
442 let actual: Vec<_> = stream.try_collect().await.unwrap();
443 check_entries(&entries[1..], 2, &actual);
444
445 let stream = wal.scan(id1, 5, &ns1).unwrap();
447 let actual: Vec<_> = stream.try_collect().await.unwrap();
448 assert!(actual.is_empty());
449 }
450
451 #[tokio::test]
452 async fn test_obsolete_wal() {
453 let env = WalEnv::new().await;
454 let wal = env.new_wal();
455
456 let entries = sample_entries();
457 let mut writer = wal.writer();
458 let region_id = RegionId::new(1, 1);
459 let ns = Provider::raft_engine_provider(region_id.as_u64());
460 writer.add_entry(region_id, 1, &entries[0], &ns).unwrap();
461 writer.add_entry(region_id, 2, &entries[1], &ns).unwrap();
462 writer.add_entry(region_id, 3, &entries[2], &ns).unwrap();
463
464 writer.write_to_wal().await.unwrap();
465
466 wal.obsolete(region_id, 2, &ns).await.unwrap();
468
469 let mut writer = wal.writer();
471 writer.add_entry(region_id, 4, &entries[3], &ns).unwrap();
472 writer.write_to_wal().await.unwrap();
473
474 let stream = wal.scan(region_id, 1, &ns).unwrap();
476 let actual: Vec<_> = stream.try_collect().await.unwrap();
477 check_entries(&entries[2..], 3, &actual);
478 }
479}