1use std::collections::{hash_map, HashMap};
16use std::fmt::{Debug, Formatter};
17use std::sync::Arc;
18use std::time::Duration;
19
20use async_stream::stream;
21use common_runtime::{RepeatedTask, TaskFunction};
22use common_telemetry::{debug, error, info};
23use common_wal::config::raft_engine::RaftEngineConfig;
24use raft_engine::{Config, Engine, LogBatch, MessageExt, ReadableSize, RecoveryMode};
25use snafu::{ensure, OptionExt, ResultExt};
26use store_api::logstore::entry::{Entry, Id as EntryId, NaiveEntry};
27use store_api::logstore::provider::{Provider, RaftEngineProvider};
28use store_api::logstore::{AppendBatchResponse, LogStore, SendableEntryStream, WalIndex};
29use store_api::storage::RegionId;
30
31use crate::error::{
32 AddEntryLogBatchSnafu, DiscontinuousLogIndexSnafu, Error, FetchEntrySnafu,
33 IllegalNamespaceSnafu, IllegalStateSnafu, InvalidProviderSnafu, OverrideCompactedEntrySnafu,
34 RaftEngineSnafu, Result, StartWalTaskSnafu, StopWalTaskSnafu,
35};
36use crate::metrics;
37use crate::raft_engine::backend::SYSTEM_NAMESPACE;
38use crate::raft_engine::protos::logstore::{EntryImpl, NamespaceImpl};
39
40const NAMESPACE_PREFIX: &str = "$sys/";
41
42pub struct RaftEngineLogStore {
43 sync_write: bool,
44 sync_period: Option<Duration>,
45 read_batch_size: usize,
46 engine: Arc<Engine>,
47 gc_task: RepeatedTask<Error>,
48 sync_task: RepeatedTask<Error>,
49}
50
51pub struct PurgeExpiredFilesFunction {
52 pub engine: Arc<Engine>,
53}
54
55#[async_trait::async_trait]
56impl TaskFunction<Error> for PurgeExpiredFilesFunction {
57 fn name(&self) -> &str {
58 "RaftEngineLogStore-gc-task"
59 }
60
61 async fn call(&mut self) -> Result<()> {
62 match self.engine.purge_expired_files().context(RaftEngineSnafu) {
63 Ok(res) => {
64 let log_string = format!(
67 "Successfully purged logstore files, namespaces need compaction: {:?}",
68 res
69 );
70 if res.is_empty() {
71 debug!(log_string);
72 } else {
73 info!(log_string);
74 }
75 }
76 Err(e) => {
77 error!(e; "Failed to purge files in logstore");
78 }
79 }
80
81 Ok(())
82 }
83}
84
85pub struct SyncWalTaskFunction {
86 engine: Arc<Engine>,
87}
88
89#[async_trait::async_trait]
90impl TaskFunction<Error> for SyncWalTaskFunction {
91 async fn call(&mut self) -> std::result::Result<(), Error> {
92 let engine = self.engine.clone();
93 if let Err(e) = tokio::task::spawn_blocking(move || engine.sync()).await {
94 error!(e; "Failed to sync raft engine log files");
95 };
96 Ok(())
97 }
98
99 fn name(&self) -> &str {
100 "SyncWalTaskFunction"
101 }
102}
103
104impl SyncWalTaskFunction {
105 pub fn new(engine: Arc<Engine>) -> Self {
106 Self { engine }
107 }
108}
109
110impl RaftEngineLogStore {
111 pub async fn try_new(dir: String, config: &RaftEngineConfig) -> Result<Self> {
112 let raft_engine_config = Config {
113 dir,
114 purge_threshold: ReadableSize(config.purge_threshold.0),
115 recovery_mode: RecoveryMode::TolerateTailCorruption,
116 batch_compression_threshold: ReadableSize::kb(8),
117 target_file_size: ReadableSize(config.file_size.0),
118 enable_log_recycle: config.enable_log_recycle,
119 prefill_for_recycle: config.prefill_log_files,
120 recovery_threads: config.recovery_parallelism,
121 ..Default::default()
122 };
123 let engine = Arc::new(Engine::open(raft_engine_config).context(RaftEngineSnafu)?);
124 let gc_task = RepeatedTask::new(
125 config.purge_interval,
126 Box::new(PurgeExpiredFilesFunction {
127 engine: engine.clone(),
128 }),
129 );
130
131 let sync_task = RepeatedTask::new(
132 config.sync_period.unwrap_or(Duration::from_secs(5)),
133 Box::new(SyncWalTaskFunction::new(engine.clone())),
134 );
135
136 let log_store = Self {
137 sync_write: config.sync_write,
138 sync_period: config.sync_period,
139 read_batch_size: config.read_batch_size,
140 engine,
141 gc_task,
142 sync_task,
143 };
144 log_store.start()?;
145 Ok(log_store)
146 }
147
148 pub fn started(&self) -> bool {
149 self.gc_task.started()
150 }
151
152 fn start(&self) -> Result<()> {
153 self.gc_task
154 .start(common_runtime::global_runtime())
155 .context(StartWalTaskSnafu { name: "gc_task" })?;
156 self.sync_task
157 .start(common_runtime::global_runtime())
158 .context(StartWalTaskSnafu { name: "sync_task" })
159 }
160
161 fn span(&self, provider: &RaftEngineProvider) -> (Option<u64>, Option<u64>) {
162 (
163 self.engine.first_index(provider.id),
164 self.engine.last_index(provider.id),
165 )
166 }
167
168 fn entries_to_batch(
172 &self,
173 entries: Vec<Entry>,
174 ) -> Result<(LogBatch, HashMap<RegionId, EntryId>)> {
175 let mut entry_ids: HashMap<RegionId, EntryId> = HashMap::with_capacity(entries.len());
177 let mut batch = LogBatch::with_capacity(entries.len());
178
179 for e in entries {
180 let region_id = e.region_id();
181 let entry_id = e.entry_id();
182 match entry_ids.entry(region_id) {
183 hash_map::Entry::Occupied(mut o) => {
184 let prev = *o.get();
185 ensure!(
186 entry_id == prev + 1,
187 DiscontinuousLogIndexSnafu {
188 region_id,
189 last_index: prev,
190 attempt_index: entry_id
191 }
192 );
193 o.insert(entry_id);
194 }
195 hash_map::Entry::Vacant(v) => {
196 if let Some(first_index) = self.engine.first_index(region_id.as_u64()) {
198 ensure!(
200 entry_id > first_index,
201 OverrideCompactedEntrySnafu {
202 namespace: region_id,
203 first_index,
204 attempt_index: entry_id,
205 }
206 );
207 }
208 if let Some(last_index) = self.engine.last_index(region_id.as_u64()) {
210 ensure!(
211 entry_id == last_index + 1,
212 DiscontinuousLogIndexSnafu {
213 region_id,
214 last_index,
215 attempt_index: entry_id
216 }
217 );
218 }
219 v.insert(entry_id);
220 }
221 }
222 batch
223 .add_entries::<MessageType>(
224 region_id.as_u64(),
225 &[EntryImpl {
226 id: entry_id,
227 namespace_id: region_id.as_u64(),
228 data: e.into_bytes(),
229 ..Default::default()
230 }],
231 )
232 .context(AddEntryLogBatchSnafu)?;
233 }
234
235 Ok((batch, entry_ids))
236 }
237}
238
239impl Debug for RaftEngineLogStore {
240 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
241 f.debug_struct("RaftEngineLogsStore")
242 .field("sync_write", &self.sync_write)
243 .field("sync_period", &self.sync_period)
244 .field("read_batch_size", &self.read_batch_size)
245 .field("started", &self.gc_task.started())
246 .finish()
247 }
248}
249
250#[async_trait::async_trait]
251impl LogStore for RaftEngineLogStore {
252 type Error = Error;
253
254 async fn stop(&self) -> Result<()> {
255 self.gc_task
256 .stop()
257 .await
258 .context(StopWalTaskSnafu { name: "gc_task" })?;
259 self.sync_task
260 .stop()
261 .await
262 .context(StopWalTaskSnafu { name: "sync_task" })
263 }
264
265 async fn append_batch(&self, entries: Vec<Entry>) -> Result<AppendBatchResponse> {
268 metrics::METRIC_RAFT_ENGINE_APPEND_BATCH_BYTES_TOTAL.inc_by(
269 entries
270 .iter()
271 .map(|entry| entry.estimated_size())
272 .sum::<usize>() as u64,
273 );
274 let _timer = metrics::METRIC_RAFT_ENGINE_APPEND_BATCH_ELAPSED.start_timer();
275
276 ensure!(self.started(), IllegalStateSnafu);
277 if entries.is_empty() {
278 return Ok(AppendBatchResponse::default());
279 }
280
281 let (mut batch, last_entry_ids) = self.entries_to_batch(entries)?;
282 let _ = self
283 .engine
284 .write(&mut batch, self.sync_write)
285 .context(RaftEngineSnafu)?;
286
287 Ok(AppendBatchResponse { last_entry_ids })
288 }
289
290 async fn read(
293 &self,
294 provider: &Provider,
295 entry_id: EntryId,
296 _index: Option<WalIndex>,
297 ) -> Result<SendableEntryStream<'static, Entry, Self::Error>> {
298 let ns = provider
299 .as_raft_engine_provider()
300 .with_context(|| InvalidProviderSnafu {
301 expected: RaftEngineProvider::type_name(),
302 actual: provider.type_name(),
303 })?;
304 let namespace_id = ns.id;
305 let _timer = metrics::METRIC_RAFT_ENGINE_READ_ELAPSED.start_timer();
306
307 ensure!(self.started(), IllegalStateSnafu);
308 let engine = self.engine.clone();
309
310 let last_index = engine.last_index(namespace_id).unwrap_or(0);
311 let mut start_index =
312 entry_id.max(engine.first_index(namespace_id).unwrap_or(last_index + 1));
313
314 info!(
315 "Read logstore, namespace: {}, start: {}, span: {:?}",
316 namespace_id,
317 entry_id,
318 self.span(ns)
319 );
320 let max_batch_size = self.read_batch_size;
321 let (tx, mut rx) = tokio::sync::mpsc::channel(max_batch_size);
322 let _handle = common_runtime::spawn_global(async move {
323 while start_index <= last_index {
324 let mut vec = Vec::with_capacity(max_batch_size);
325 match engine
326 .fetch_entries_to::<MessageType>(
327 namespace_id,
328 start_index,
329 last_index + 1,
330 Some(max_batch_size),
331 &mut vec,
332 )
333 .context(FetchEntrySnafu {
334 ns: namespace_id,
335 start: start_index,
336 end: last_index,
337 max_size: max_batch_size,
338 }) {
339 Ok(_) => {
340 if let Some(last_entry) = vec.last() {
341 start_index = last_entry.id + 1;
342 }
343 if tx.send(Ok(vec)).await.is_err() {
345 break;
346 }
347 }
348 Err(e) => {
349 let _ = tx.send(Err(e)).await;
350 break;
351 }
352 }
353 }
354 });
355
356 let s = stream!({
357 while let Some(res) = rx.recv().await {
358 let res = res?;
359
360 yield Ok(res.into_iter().map(Entry::from).collect::<Vec<_>>());
361 }
362 });
363 Ok(Box::pin(s))
364 }
365
366 async fn create_namespace(&self, ns: &Provider) -> Result<()> {
367 let ns = ns
368 .as_raft_engine_provider()
369 .with_context(|| InvalidProviderSnafu {
370 expected: RaftEngineProvider::type_name(),
371 actual: ns.type_name(),
372 })?;
373 let namespace_id = ns.id;
374 ensure!(
375 namespace_id != SYSTEM_NAMESPACE,
376 IllegalNamespaceSnafu { ns: namespace_id }
377 );
378 ensure!(self.started(), IllegalStateSnafu);
379 let key = format!("{}{}", NAMESPACE_PREFIX, namespace_id)
380 .as_bytes()
381 .to_vec();
382 let mut batch = LogBatch::with_capacity(1);
383 batch
384 .put_message::<NamespaceImpl>(
385 SYSTEM_NAMESPACE,
386 key,
387 &NamespaceImpl {
388 id: namespace_id,
389 ..Default::default()
390 },
391 )
392 .context(RaftEngineSnafu)?;
393 let _ = self
394 .engine
395 .write(&mut batch, true)
396 .context(RaftEngineSnafu)?;
397 Ok(())
398 }
399
400 async fn delete_namespace(&self, ns: &Provider) -> Result<()> {
401 let ns = ns
402 .as_raft_engine_provider()
403 .with_context(|| InvalidProviderSnafu {
404 expected: RaftEngineProvider::type_name(),
405 actual: ns.type_name(),
406 })?;
407 let namespace_id = ns.id;
408 ensure!(
409 namespace_id != SYSTEM_NAMESPACE,
410 IllegalNamespaceSnafu { ns: namespace_id }
411 );
412 ensure!(self.started(), IllegalStateSnafu);
413 let key = format!("{}{}", NAMESPACE_PREFIX, namespace_id)
414 .as_bytes()
415 .to_vec();
416 let mut batch = LogBatch::with_capacity(1);
417 batch.delete(SYSTEM_NAMESPACE, key);
418 let _ = self
419 .engine
420 .write(&mut batch, true)
421 .context(RaftEngineSnafu)?;
422 Ok(())
423 }
424
425 async fn list_namespaces(&self) -> Result<Vec<Provider>> {
426 ensure!(self.started(), IllegalStateSnafu);
427 let mut namespaces: Vec<Provider> = vec![];
428 self.engine
429 .scan_messages::<NamespaceImpl, _>(
430 SYSTEM_NAMESPACE,
431 Some(NAMESPACE_PREFIX.as_bytes()),
432 None,
433 false,
434 |_, v| {
435 namespaces.push(Provider::RaftEngine(RaftEngineProvider { id: v.id }));
436 true
437 },
438 )
439 .context(RaftEngineSnafu)?;
440 Ok(namespaces)
441 }
442
443 fn entry(
444 &self,
445 data: &mut Vec<u8>,
446 entry_id: EntryId,
447 region_id: RegionId,
448 provider: &Provider,
449 ) -> Result<Entry> {
450 debug_assert_eq!(
451 provider.as_raft_engine_provider().unwrap().id,
452 region_id.as_u64()
453 );
454 Ok(Entry::Naive(NaiveEntry {
455 provider: provider.clone(),
456 region_id,
457 entry_id,
458 data: std::mem::take(data),
459 }))
460 }
461
462 async fn obsolete(
463 &self,
464 provider: &Provider,
465 _region_id: RegionId,
466 entry_id: EntryId,
467 ) -> Result<()> {
468 let ns = provider
469 .as_raft_engine_provider()
470 .with_context(|| InvalidProviderSnafu {
471 expected: RaftEngineProvider::type_name(),
472 actual: provider.type_name(),
473 })?;
474 let namespace_id = ns.id;
475 ensure!(self.started(), IllegalStateSnafu);
476 let obsoleted = self.engine.compact_to(namespace_id, entry_id + 1);
477 info!(
478 "Namespace {} obsoleted {} entries, compacted index: {}, span: {:?}",
479 namespace_id,
480 obsoleted,
481 entry_id,
482 self.span(ns)
483 );
484 Ok(())
485 }
486
487 fn high_watermark(&self, provider: &Provider) -> Result<EntryId> {
488 let ns = provider
489 .as_raft_engine_provider()
490 .with_context(|| InvalidProviderSnafu {
491 expected: RaftEngineProvider::type_name(),
492 actual: provider.type_name(),
493 })?;
494 let namespace_id = ns.id;
495 let last_index = self.engine.last_index(namespace_id).unwrap_or(0);
496 Ok(last_index)
497 }
498}
499
500#[derive(Debug, Clone)]
501struct MessageType;
502
503impl MessageExt for MessageType {
504 type Entry = EntryImpl;
505
506 fn index(e: &Self::Entry) -> u64 {
507 e.id
508 }
509}
510
511#[cfg(test)]
512impl RaftEngineLogStore {
513 async fn append(&self, entry: Entry) -> Result<store_api::logstore::AppendResponse> {
516 let response = self.append_batch(vec![entry]).await?;
517 if let Some((_, last_entry_id)) = response.last_entry_ids.into_iter().next() {
518 return Ok(store_api::logstore::AppendResponse { last_entry_id });
519 }
520 unreachable!()
521 }
522}
523
524#[cfg(test)]
525mod tests {
526 use std::collections::HashSet;
527 use std::time::Duration;
528
529 use common_base::readable_size::ReadableSize;
530 use common_telemetry::debug;
531 use common_test_util::temp_dir::{create_temp_dir, TempDir};
532 use futures_util::StreamExt;
533 use store_api::logstore::{LogStore, SendableEntryStream};
534
535 use super::*;
536 use crate::error::Error;
537 use crate::raft_engine::log_store::RaftEngineLogStore;
538 use crate::raft_engine::protos::logstore::EntryImpl;
539
540 #[tokio::test]
541 async fn test_open_logstore() {
542 let dir = create_temp_dir("raft-engine-logstore-test");
543 let logstore = RaftEngineLogStore::try_new(
544 dir.path().to_str().unwrap().to_string(),
545 &RaftEngineConfig::default(),
546 )
547 .await
548 .unwrap();
549 let namespaces = logstore.list_namespaces().await.unwrap();
550 assert_eq!(0, namespaces.len());
551 }
552
553 #[tokio::test]
554 async fn test_manage_namespace() {
555 let dir = create_temp_dir("raft-engine-logstore-test");
556 let logstore = RaftEngineLogStore::try_new(
557 dir.path().to_str().unwrap().to_string(),
558 &RaftEngineConfig::default(),
559 )
560 .await
561 .unwrap();
562 assert!(logstore.list_namespaces().await.unwrap().is_empty());
563
564 logstore
565 .create_namespace(&Provider::raft_engine_provider(42))
566 .await
567 .unwrap();
568 let namespaces = logstore.list_namespaces().await.unwrap();
569 assert_eq!(1, namespaces.len());
570 assert_eq!(Provider::raft_engine_provider(42), namespaces[0]);
571
572 logstore
573 .delete_namespace(&Provider::raft_engine_provider(42))
574 .await
575 .unwrap();
576 assert!(logstore.list_namespaces().await.unwrap().is_empty());
577 }
578
579 #[tokio::test]
580 async fn test_append_and_read() {
581 let dir = create_temp_dir("raft-engine-logstore-test");
582 let logstore = RaftEngineLogStore::try_new(
583 dir.path().to_str().unwrap().to_string(),
584 &RaftEngineConfig::default(),
585 )
586 .await
587 .unwrap();
588
589 let namespace_id = 1;
590 let cnt = 1024;
591 for i in 0..cnt {
592 let response = logstore
593 .append(
594 EntryImpl::create(i, namespace_id, i.to_string().as_bytes().to_vec()).into(),
595 )
596 .await
597 .unwrap();
598 assert_eq!(i, response.last_entry_id);
599 }
600 let mut entries = HashSet::with_capacity(1024);
601 let mut s = logstore
602 .read(&Provider::raft_engine_provider(1), 0, None)
603 .await
604 .unwrap();
605 while let Some(r) = s.next().await {
606 let vec = r.unwrap();
607 entries.extend(vec.into_iter().map(|e| e.entry_id()));
608 }
609 assert_eq!((0..cnt).collect::<HashSet<_>>(), entries);
610 }
611
612 async fn collect_entries(mut s: SendableEntryStream<'_, Entry, Error>) -> Vec<Entry> {
613 let mut res = vec![];
614 while let Some(r) = s.next().await {
615 res.extend(r.unwrap());
616 }
617 res
618 }
619
620 #[tokio::test]
621 async fn test_reopen() {
622 let dir = create_temp_dir("raft-engine-logstore-reopen-test");
623 {
624 let logstore = RaftEngineLogStore::try_new(
625 dir.path().to_str().unwrap().to_string(),
626 &RaftEngineConfig::default(),
627 )
628 .await
629 .unwrap();
630 assert!(logstore
631 .append(EntryImpl::create(1, 1, "1".as_bytes().to_vec()).into())
632 .await
633 .is_ok());
634 let entries = logstore
635 .read(&Provider::raft_engine_provider(1), 1, None)
636 .await
637 .unwrap()
638 .collect::<Vec<_>>()
639 .await;
640 assert_eq!(1, entries.len());
641 logstore.stop().await.unwrap();
642 }
643
644 let logstore = RaftEngineLogStore::try_new(
645 dir.path().to_str().unwrap().to_string(),
646 &RaftEngineConfig::default(),
647 )
648 .await
649 .unwrap();
650
651 let entries = collect_entries(
652 logstore
653 .read(&Provider::raft_engine_provider(1), 1, None)
654 .await
655 .unwrap(),
656 )
657 .await;
658 assert_eq!(1, entries.len());
659 assert_eq!(1, entries[0].entry_id());
660 assert_eq!(1, entries[0].region_id().as_u64());
661 }
662
663 async fn wal_dir_usage(path: impl AsRef<str>) -> usize {
664 let mut size: usize = 0;
665 let mut read_dir = tokio::fs::read_dir(path.as_ref()).await.unwrap();
666 while let Ok(dir_entry) = read_dir.next_entry().await {
667 let Some(entry) = dir_entry else {
668 break;
669 };
670 if entry.file_type().await.unwrap().is_file() {
671 let file_name = entry.file_name();
672 let file_size = entry.metadata().await.unwrap().len() as usize;
673 debug!("File: {file_name:?}, size: {file_size}");
674 size += file_size;
675 }
676 }
677 size
678 }
679
680 async fn new_test_log_store(dir: &TempDir) -> RaftEngineLogStore {
681 let path = dir.path().to_str().unwrap().to_string();
682
683 let config = RaftEngineConfig {
684 file_size: ReadableSize::mb(2),
685 purge_threshold: ReadableSize::mb(4),
686 purge_interval: Duration::from_secs(5),
687 ..Default::default()
688 };
689
690 RaftEngineLogStore::try_new(path, &config).await.unwrap()
691 }
692
693 #[tokio::test]
694 async fn test_compaction() {
695 common_telemetry::init_default_ut_logging();
696 let dir = create_temp_dir("raft-engine-logstore-test");
697 let logstore = new_test_log_store(&dir).await;
698
699 let region_id = RegionId::new(1, 1);
700 let namespace_id = region_id.as_u64();
701 let namespace = Provider::raft_engine_provider(namespace_id);
702 for id in 0..4096 {
703 let entry = EntryImpl::create(id, namespace_id, [b'x'; 4096].to_vec()).into();
704 let _ = logstore.append(entry).await.unwrap();
705 }
706
707 let before_purge = wal_dir_usage(dir.path().to_str().unwrap()).await;
708 logstore
709 .obsolete(&namespace, region_id, 4000)
710 .await
711 .unwrap();
712
713 tokio::time::sleep(Duration::from_secs(6)).await;
714 let after_purge = wal_dir_usage(dir.path().to_str().unwrap()).await;
715 debug!(
716 "Before purge: {}, after purge: {}",
717 before_purge, after_purge
718 );
719 assert!(before_purge > after_purge);
720 }
721
722 #[tokio::test]
723 async fn test_obsolete() {
724 common_telemetry::init_default_ut_logging();
725 let dir = create_temp_dir("raft-engine-logstore-test");
726 let logstore = new_test_log_store(&dir).await;
727
728 let region_id = RegionId::new(1, 1);
729 let namespace_id = region_id.as_u64();
730 let namespace = Provider::raft_engine_provider(namespace_id);
731 for id in 0..1024 {
732 let entry = EntryImpl::create(id, namespace_id, [b'x'; 4096].to_vec()).into();
733 let _ = logstore.append(entry).await.unwrap();
734 }
735
736 logstore.obsolete(&namespace, region_id, 100).await.unwrap();
737 assert_eq!(101, logstore.engine.first_index(namespace_id).unwrap());
738
739 let res = logstore.read(&namespace, 100, None).await.unwrap();
740 let mut vec = collect_entries(res).await;
741 vec.sort_by(|a, b| a.entry_id().partial_cmp(&b.entry_id()).unwrap());
742 assert_eq!(101, vec.first().unwrap().entry_id());
743 }
744
745 #[tokio::test]
746 async fn test_append_batch() {
747 common_telemetry::init_default_ut_logging();
748 let dir = create_temp_dir("logstore-append-batch-test");
749 let logstore = new_test_log_store(&dir).await;
750
751 let entries = (0..8)
752 .flat_map(|ns_id| {
753 let data = [ns_id as u8].repeat(4096);
754 (0..16).map(move |idx| EntryImpl::create(idx, ns_id, data.clone()).into())
755 })
756 .collect();
757
758 logstore.append_batch(entries).await.unwrap();
759 for ns_id in 0..8 {
760 let namespace = &RaftEngineProvider::new(ns_id);
761 let (first, last) = logstore.span(namespace);
762 assert_eq!(0, first.unwrap());
763 assert_eq!(15, last.unwrap());
764 }
765 }
766
767 #[tokio::test]
768 async fn test_append_batch_interleaved() {
769 common_telemetry::init_default_ut_logging();
770 let dir = create_temp_dir("logstore-append-batch-test");
771 let logstore = new_test_log_store(&dir).await;
772 let entries = vec![
773 EntryImpl::create(0, 0, [b'0'; 4096].to_vec()).into(),
774 EntryImpl::create(1, 0, [b'0'; 4096].to_vec()).into(),
775 EntryImpl::create(0, 1, [b'1'; 4096].to_vec()).into(),
776 EntryImpl::create(2, 0, [b'0'; 4096].to_vec()).into(),
777 EntryImpl::create(1, 1, [b'1'; 4096].to_vec()).into(),
778 ];
779
780 logstore.append_batch(entries).await.unwrap();
781
782 assert_eq!(
783 (Some(0), Some(2)),
784 logstore.span(&RaftEngineProvider::new(0))
785 );
786 assert_eq!(
787 (Some(0), Some(1)),
788 logstore.span(&RaftEngineProvider::new(1))
789 );
790 }
791
792 #[tokio::test]
793 async fn test_append_batch_response() {
794 common_telemetry::init_default_ut_logging();
795 let dir = create_temp_dir("logstore-append-batch-test");
796 let logstore = new_test_log_store(&dir).await;
797
798 let entries = vec![
799 EntryImpl::create(0, 0, [b'0'; 4096].to_vec()).into(),
801 EntryImpl::create(0, 1, [b'1'; 4096].to_vec()).into(),
803 EntryImpl::create(1, 0, [b'1'; 4096].to_vec()).into(),
805 EntryImpl::create(1, 1, [b'0'; 4096].to_vec()).into(),
807 EntryImpl::create(2, 2, [b'2'; 4096].to_vec()).into(),
809 ];
810
811 let last_entry_ids = logstore.append_batch(entries).await.unwrap().last_entry_ids;
813 assert_eq!(last_entry_ids[&(0.into())], 1);
814 assert_eq!(last_entry_ids[&(1.into())], 1);
815 assert_eq!(last_entry_ids[&(2.into())], 2);
816 }
817}