1pub(crate) mod entry_distributor;
18pub(crate) mod entry_reader;
19pub(crate) mod raw_entry_reader;
20
21use std::collections::HashMap;
22use std::mem;
23use std::sync::Arc;
24
25use api::v1::WalEntry;
26use common_error::ext::BoxedError;
27use common_telemetry::debug;
28use entry_reader::NoopEntryReader;
29use futures::future::BoxFuture;
30use futures::stream::BoxStream;
31use prost::Message;
32use snafu::ResultExt;
33use store_api::logstore::entry::Entry;
34use store_api::logstore::provider::Provider;
35use store_api::logstore::{AppendBatchResponse, LogStore, WalIndex};
36use store_api::storage::RegionId;
37
38use crate::error::{BuildEntrySnafu, DeleteWalSnafu, Result, WriteWalSnafu};
39use crate::wal::entry_reader::{LogStoreEntryReader, WalEntryReader};
40use crate::wal::raw_entry_reader::{LogStoreRawEntryReader, RegionRawEntryReader};
41
42pub type EntryId = store_api::logstore::entry::Id;
44pub type WalEntryStream<'a> = BoxStream<'a, Result<(EntryId, WalEntry)>>;
46
47#[derive(Debug)]
51pub struct Wal<S> {
52 store: Arc<S>,
54}
55
56impl<S> Wal<S> {
57 pub fn new(store: Arc<S>) -> Self {
59 Self { store }
60 }
61
62 pub fn store(&self) -> &Arc<S> {
63 &self.store
64 }
65}
66
67impl<S> Clone for Wal<S> {
68 fn clone(&self) -> Self {
69 Self {
70 store: self.store.clone(),
71 }
72 }
73}
74
75impl<S: LogStore> Wal<S> {
76 pub fn writer(&self) -> WalWriter<S> {
78 WalWriter {
79 store: self.store.clone(),
80 entries: Vec::new(),
81 providers: HashMap::new(),
82 }
83 }
84
85 pub(crate) fn on_region_opened(
87 &self,
88 ) -> impl FnOnce(RegionId, EntryId, &Provider) -> BoxFuture<Result<()>> {
89 let store = self.store.clone();
90 move |region_id, last_entry_id, provider| -> BoxFuture<'_, Result<()>> {
91 if let Provider::Noop = provider {
92 debug!("Skip obsolete for region: {}", region_id);
93 return Box::pin(async move { Ok(()) });
94 }
95 Box::pin(async move {
96 store
97 .obsolete(provider, region_id, last_entry_id)
98 .await
99 .map_err(BoxedError::new)
100 .context(DeleteWalSnafu { region_id })
101 })
102 }
103 }
104
105 pub(crate) fn wal_entry_reader(
107 &self,
108 provider: &Provider,
109 region_id: RegionId,
110 location_id: Option<u64>,
111 ) -> Box<dyn WalEntryReader> {
112 match provider {
113 Provider::RaftEngine(_) => Box::new(LogStoreEntryReader::new(
114 LogStoreRawEntryReader::new(self.store.clone()),
115 )),
116 Provider::Kafka(_) => {
117 let reader = if let Some(location_id) = location_id {
118 LogStoreRawEntryReader::new(self.store.clone())
119 .with_wal_index(WalIndex::new(region_id, location_id))
120 } else {
121 LogStoreRawEntryReader::new(self.store.clone())
122 };
123
124 Box::new(LogStoreEntryReader::new(RegionRawEntryReader::new(
125 reader, region_id,
126 )))
127 }
128 Provider::Noop => Box::new(NoopEntryReader),
129 }
130 }
131
132 pub fn scan<'a>(
135 &'a self,
136 region_id: RegionId,
137 start_id: EntryId,
138 provider: &'a Provider,
139 ) -> Result<WalEntryStream<'a>> {
140 match provider {
141 Provider::RaftEngine(_) => {
142 LogStoreEntryReader::new(LogStoreRawEntryReader::new(self.store.clone()))
143 .read(provider, start_id)
144 }
145 Provider::Kafka(_) => LogStoreEntryReader::new(RegionRawEntryReader::new(
146 LogStoreRawEntryReader::new(self.store.clone()),
147 region_id,
148 ))
149 .read(provider, start_id),
150 Provider::Noop => Ok(Box::pin(futures::stream::empty())),
151 }
152 }
153
154 pub async fn obsolete(
156 &self,
157 region_id: RegionId,
158 last_id: EntryId,
159 provider: &Provider,
160 ) -> Result<()> {
161 if let Provider::Noop = provider {
162 return Ok(());
163 }
164 self.store
165 .obsolete(provider, region_id, last_id)
166 .await
167 .map_err(BoxedError::new)
168 .context(DeleteWalSnafu { region_id })
169 }
170}
171
172pub struct WalWriter<S: LogStore> {
174 store: Arc<S>,
176 entries: Vec<Entry>,
178 providers: HashMap<RegionId, Provider>,
180}
181
182impl<S: LogStore> WalWriter<S> {
183 pub fn add_entry(
185 &mut self,
186 region_id: RegionId,
187 entry_id: EntryId,
188 wal_entry: &WalEntry,
189 provider: &Provider,
190 ) -> Result<()> {
191 let provider = self
193 .providers
194 .entry(region_id)
195 .or_insert_with(|| provider.clone());
196
197 let data = wal_entry.encode_to_vec();
198 let entry = self
199 .store
200 .entry(data, entry_id, region_id, provider)
201 .map_err(BoxedError::new)
202 .context(BuildEntrySnafu { region_id })?;
203
204 self.entries.push(entry);
205
206 Ok(())
207 }
208
209 pub async fn write_to_wal(&mut self) -> Result<AppendBatchResponse> {
211 let entries = mem::take(&mut self.entries);
214 self.store
215 .append_batch(entries)
216 .await
217 .map_err(BoxedError::new)
218 .context(WriteWalSnafu)
219 }
220}
221
222#[cfg(test)]
223mod tests {
224 use api::v1::{
225 bulk_wal_entry, value, ArrowIpc, BulkWalEntry, ColumnDataType, ColumnSchema, Mutation,
226 OpType, Row, Rows, SemanticType, Value,
227 };
228 use common_recordbatch::DfRecordBatch;
229 use common_test_util::flight::encode_to_flight_data;
230 use common_test_util::temp_dir::{create_temp_dir, TempDir};
231 use datatypes::arrow;
232 use datatypes::arrow::array::{ArrayRef, TimestampMillisecondArray};
233 use datatypes::arrow::datatypes::Field;
234 use datatypes::arrow_array::StringArray;
235 use futures::TryStreamExt;
236 use log_store::raft_engine::log_store::RaftEngineLogStore;
237 use log_store::test_util::log_store_util;
238 use store_api::storage::SequenceNumber;
239
240 use super::*;
241
242 struct WalEnv {
243 _wal_dir: TempDir,
244 log_store: Option<Arc<RaftEngineLogStore>>,
245 }
246
247 impl WalEnv {
248 async fn new() -> WalEnv {
249 let wal_dir = create_temp_dir("");
250 let log_store =
251 log_store_util::create_tmp_local_file_log_store(wal_dir.path().to_str().unwrap())
252 .await;
253 WalEnv {
254 _wal_dir: wal_dir,
255 log_store: Some(Arc::new(log_store)),
256 }
257 }
258
259 fn new_wal(&self) -> Wal<RaftEngineLogStore> {
260 let log_store = self.log_store.clone().unwrap();
261 Wal::new(log_store)
262 }
263 }
264
265 fn new_mutation(op_type: OpType, sequence: SequenceNumber, rows: &[(&str, i64)]) -> Mutation {
269 let rows = rows
270 .iter()
271 .map(|(str_col, int_col)| {
272 let values = vec![
273 Value {
274 value_data: Some(value::ValueData::StringValue(str_col.to_string())),
275 },
276 Value {
277 value_data: Some(value::ValueData::TimestampMillisecondValue(*int_col)),
278 },
279 ];
280 Row { values }
281 })
282 .collect();
283 let schema = vec![
284 ColumnSchema {
285 column_name: "tag".to_string(),
286 datatype: ColumnDataType::String as i32,
287 semantic_type: SemanticType::Tag as i32,
288 ..Default::default()
289 },
290 ColumnSchema {
291 column_name: "ts".to_string(),
292 datatype: ColumnDataType::TimestampMillisecond as i32,
293 semantic_type: SemanticType::Timestamp as i32,
294 ..Default::default()
295 },
296 ];
297
298 Mutation {
299 op_type: op_type as i32,
300 sequence,
301 rows: Some(Rows { schema, rows }),
302 write_hint: None,
303 }
304 }
305
306 #[tokio::test]
307 async fn test_write_wal() {
308 let env = WalEnv::new().await;
309 let wal = env.new_wal();
310
311 let entry = WalEntry {
312 mutations: vec![
313 new_mutation(OpType::Put, 1, &[("k1", 1), ("k2", 2)]),
314 new_mutation(OpType::Put, 2, &[("k3", 3), ("k4", 4)]),
315 ],
316 bulk_entries: vec![],
317 };
318 let mut writer = wal.writer();
319 let region_id = RegionId::new(1, 1);
321 writer
322 .add_entry(
323 region_id,
324 1,
325 &entry,
326 &Provider::raft_engine_provider(region_id.as_u64()),
327 )
328 .unwrap();
329 let region_id = RegionId::new(1, 2);
331 writer
332 .add_entry(
333 region_id,
334 1,
335 &entry,
336 &Provider::raft_engine_provider(region_id.as_u64()),
337 )
338 .unwrap();
339 let region_id = RegionId::new(1, 2);
341 writer
342 .add_entry(
343 region_id,
344 2,
345 &entry,
346 &Provider::raft_engine_provider(region_id.as_u64()),
347 )
348 .unwrap();
349
350 writer.write_to_wal().await.unwrap();
352 }
353
354 fn build_record_batch(rows: &[(&str, i64)]) -> DfRecordBatch {
355 let schema = Arc::new(arrow::datatypes::Schema::new(vec![
356 Field::new("tag", arrow::datatypes::DataType::Utf8, false),
357 Field::new(
358 "ts",
359 arrow::datatypes::DataType::Timestamp(
360 arrow::datatypes::TimeUnit::Millisecond,
361 None,
362 ),
363 false,
364 ),
365 ]));
366
367 let tag = Arc::new(StringArray::from_iter_values(
368 rows.iter().map(|r| r.0.to_string()),
369 )) as ArrayRef;
370 let ts = Arc::new(TimestampMillisecondArray::from_iter_values(
371 rows.iter().map(|r| r.1),
372 )) as ArrayRef;
373 DfRecordBatch::try_new(schema, vec![tag, ts]).unwrap()
374 }
375
376 fn build_bulk_wal_entry(sequence_number: SequenceNumber, rows: &[(&str, i64)]) -> BulkWalEntry {
377 let rb = build_record_batch(rows);
378 let (schema, rb) = encode_to_flight_data(rb);
379 let max_ts = rows.iter().map(|r| r.1).max().unwrap();
380 let min_ts = rows.iter().map(|r| r.1).min().unwrap();
381 BulkWalEntry {
382 sequence: sequence_number,
383 max_ts,
384 min_ts,
385 timestamp_index: 1,
386 body: Some(bulk_wal_entry::Body::ArrowIpc(ArrowIpc {
387 schema: schema.data_header,
388 data_header: rb.data_header,
389 payload: rb.data_body,
390 })),
391 }
392 }
393
394 fn sample_entries() -> Vec<WalEntry> {
395 vec![
396 WalEntry {
397 mutations: vec![
398 new_mutation(OpType::Put, 1, &[("k1", 1), ("k2", 2)]),
399 new_mutation(OpType::Put, 2, &[("k3", 3), ("k4", 4)]),
400 ],
401 bulk_entries: vec![],
402 },
403 WalEntry {
404 mutations: vec![new_mutation(OpType::Put, 3, &[("k1", 1), ("k2", 2)])],
405 bulk_entries: vec![],
406 },
407 WalEntry {
408 mutations: vec![
409 new_mutation(OpType::Put, 4, &[("k1", 1), ("k2", 2)]),
410 new_mutation(OpType::Put, 5, &[("k3", 3), ("k4", 4)]),
411 ],
412 bulk_entries: vec![],
413 },
414 WalEntry {
415 mutations: vec![new_mutation(OpType::Put, 6, &[("k1", 1), ("k2", 2)])],
416 bulk_entries: vec![build_bulk_wal_entry(7, &[("k1", 8), ("k2", 9)])],
417 },
418 ]
419 }
420
421 fn check_entries(
422 expect: &[WalEntry],
423 expect_start_id: EntryId,
424 actual: &[(EntryId, WalEntry)],
425 ) {
426 for (idx, (expect_entry, (actual_id, actual_entry))) in
427 expect.iter().zip(actual.iter()).enumerate()
428 {
429 let expect_id_entry = (expect_start_id + idx as u64, expect_entry);
430 assert_eq!(expect_id_entry, (*actual_id, actual_entry));
431 }
432 assert_eq!(expect.len(), actual.len());
433 }
434
435 #[tokio::test]
436 async fn test_scan_wal() {
437 let env = WalEnv::new().await;
438 let wal = env.new_wal();
439
440 let entries = sample_entries();
441 let (id1, id2) = (RegionId::new(1, 1), RegionId::new(1, 2));
442 let ns1 = Provider::raft_engine_provider(id1.as_u64());
443 let ns2 = Provider::raft_engine_provider(id2.as_u64());
444 let mut writer = wal.writer();
445 writer.add_entry(id1, 1, &entries[0], &ns1).unwrap();
446 writer.add_entry(id2, 1, &entries[0], &ns2).unwrap();
448 writer.add_entry(id1, 2, &entries[1], &ns1).unwrap();
449 writer.add_entry(id1, 3, &entries[2], &ns1).unwrap();
450 writer.add_entry(id1, 4, &entries[3], &ns1).unwrap();
451
452 writer.write_to_wal().await.unwrap();
453
454 let stream = wal.scan(id1, 1, &ns1).unwrap();
456 let actual: Vec<_> = stream.try_collect().await.unwrap();
457 check_entries(&entries, 1, &actual);
458
459 let stream = wal.scan(id1, 2, &ns1).unwrap();
461 let actual: Vec<_> = stream.try_collect().await.unwrap();
462 check_entries(&entries[1..], 2, &actual);
463
464 let stream = wal.scan(id1, 5, &ns1).unwrap();
466 let actual: Vec<_> = stream.try_collect().await.unwrap();
467 assert!(actual.is_empty());
468 }
469
470 #[tokio::test]
471 async fn test_obsolete_wal() {
472 let env = WalEnv::new().await;
473 let wal = env.new_wal();
474
475 let entries = sample_entries();
476 let mut writer = wal.writer();
477 let region_id = RegionId::new(1, 1);
478 let ns = Provider::raft_engine_provider(region_id.as_u64());
479 writer.add_entry(region_id, 1, &entries[0], &ns).unwrap();
480 writer.add_entry(region_id, 2, &entries[1], &ns).unwrap();
481 writer.add_entry(region_id, 3, &entries[2], &ns).unwrap();
482
483 writer.write_to_wal().await.unwrap();
484
485 wal.obsolete(region_id, 2, &ns).await.unwrap();
487
488 let mut writer = wal.writer();
490 writer.add_entry(region_id, 4, &entries[3], &ns).unwrap();
491 writer.write_to_wal().await.unwrap();
492
493 let stream = wal.scan(region_id, 1, &ns).unwrap();
495 let actual: Vec<_> = stream.try_collect().await.unwrap();
496 check_entries(&entries[2..], 3, &actual);
497 }
498}