log_store/raft_engine/
log_store.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{hash_map, HashMap};
16use std::fmt::{Debug, Formatter};
17use std::sync::Arc;
18use std::time::Duration;
19
20use async_stream::stream;
21use common_runtime::{RepeatedTask, TaskFunction};
22use common_telemetry::{debug, error, info};
23use common_wal::config::raft_engine::RaftEngineConfig;
24use raft_engine::{Config, Engine, LogBatch, MessageExt, ReadableSize, RecoveryMode};
25use snafu::{ensure, OptionExt, ResultExt};
26use store_api::logstore::entry::{Entry, Id as EntryId, NaiveEntry};
27use store_api::logstore::provider::{Provider, RaftEngineProvider};
28use store_api::logstore::{AppendBatchResponse, LogStore, SendableEntryStream, WalIndex};
29use store_api::storage::RegionId;
30
31use crate::error::{
32    AddEntryLogBatchSnafu, DiscontinuousLogIndexSnafu, Error, FetchEntrySnafu,
33    IllegalNamespaceSnafu, IllegalStateSnafu, InvalidProviderSnafu, OverrideCompactedEntrySnafu,
34    RaftEngineSnafu, Result, StartWalTaskSnafu, StopWalTaskSnafu,
35};
36use crate::metrics;
37use crate::raft_engine::backend::SYSTEM_NAMESPACE;
38use crate::raft_engine::protos::logstore::{EntryImpl, NamespaceImpl};
39
40const NAMESPACE_PREFIX: &str = "$sys/";
41
42pub struct RaftEngineLogStore {
43    sync_write: bool,
44    sync_period: Option<Duration>,
45    read_batch_size: usize,
46    engine: Arc<Engine>,
47    gc_task: RepeatedTask<Error>,
48    sync_task: RepeatedTask<Error>,
49}
50
51pub struct PurgeExpiredFilesFunction {
52    pub engine: Arc<Engine>,
53}
54
55#[async_trait::async_trait]
56impl TaskFunction<Error> for PurgeExpiredFilesFunction {
57    fn name(&self) -> &str {
58        "RaftEngineLogStore-gc-task"
59    }
60
61    async fn call(&mut self) -> Result<()> {
62        match self.engine.purge_expired_files().context(RaftEngineSnafu) {
63            Ok(res) => {
64                // TODO(hl): the retval of purge_expired_files indicates the namespaces need to be compact,
65                // which is useful when monitoring regions failed to flush it's memtable to SSTs.
66                let log_string = format!(
67                    "Successfully purged logstore files, namespaces need compaction: {:?}",
68                    res
69                );
70                if res.is_empty() {
71                    debug!(log_string);
72                } else {
73                    info!(log_string);
74                }
75            }
76            Err(e) => {
77                error!(e; "Failed to purge files in logstore");
78            }
79        }
80
81        Ok(())
82    }
83}
84
85pub struct SyncWalTaskFunction {
86    engine: Arc<Engine>,
87}
88
89#[async_trait::async_trait]
90impl TaskFunction<Error> for SyncWalTaskFunction {
91    async fn call(&mut self) -> std::result::Result<(), Error> {
92        let engine = self.engine.clone();
93        if let Err(e) = tokio::task::spawn_blocking(move || engine.sync()).await {
94            error!(e; "Failed to sync raft engine log files");
95        };
96        Ok(())
97    }
98
99    fn name(&self) -> &str {
100        "SyncWalTaskFunction"
101    }
102}
103
104impl SyncWalTaskFunction {
105    pub fn new(engine: Arc<Engine>) -> Self {
106        Self { engine }
107    }
108}
109
110impl RaftEngineLogStore {
111    pub async fn try_new(dir: String, config: &RaftEngineConfig) -> Result<Self> {
112        let raft_engine_config = Config {
113            dir,
114            purge_threshold: ReadableSize(config.purge_threshold.0),
115            recovery_mode: RecoveryMode::TolerateTailCorruption,
116            batch_compression_threshold: ReadableSize::kb(8),
117            target_file_size: ReadableSize(config.file_size.0),
118            enable_log_recycle: config.enable_log_recycle,
119            prefill_for_recycle: config.prefill_log_files,
120            recovery_threads: config.recovery_parallelism,
121            ..Default::default()
122        };
123        let engine = Arc::new(Engine::open(raft_engine_config).context(RaftEngineSnafu)?);
124        let gc_task = RepeatedTask::new(
125            config.purge_interval,
126            Box::new(PurgeExpiredFilesFunction {
127                engine: engine.clone(),
128            }),
129        );
130
131        let sync_task = RepeatedTask::new(
132            config.sync_period.unwrap_or(Duration::from_secs(5)),
133            Box::new(SyncWalTaskFunction::new(engine.clone())),
134        );
135
136        let log_store = Self {
137            sync_write: config.sync_write,
138            sync_period: config.sync_period,
139            read_batch_size: config.read_batch_size,
140            engine,
141            gc_task,
142            sync_task,
143        };
144        log_store.start()?;
145        Ok(log_store)
146    }
147
148    pub fn started(&self) -> bool {
149        self.gc_task.started()
150    }
151
152    fn start(&self) -> Result<()> {
153        self.gc_task
154            .start(common_runtime::global_runtime())
155            .context(StartWalTaskSnafu { name: "gc_task" })?;
156        self.sync_task
157            .start(common_runtime::global_runtime())
158            .context(StartWalTaskSnafu { name: "sync_task" })
159    }
160
161    fn span(&self, provider: &RaftEngineProvider) -> (Option<u64>, Option<u64>) {
162        (
163            self.engine.first_index(provider.id),
164            self.engine.last_index(provider.id),
165        )
166    }
167
168    /// Converts entries to `LogBatch` and checks if entry ids are valid.
169    /// Returns the `LogBatch` converted along with the last entry id
170    /// to append in each namespace(region).
171    fn entries_to_batch(
172        &self,
173        entries: Vec<Entry>,
174    ) -> Result<(LogBatch, HashMap<RegionId, EntryId>)> {
175        // Records the last entry id for each region's entries.
176        let mut entry_ids: HashMap<RegionId, EntryId> = HashMap::with_capacity(entries.len());
177        let mut batch = LogBatch::with_capacity(entries.len());
178
179        for e in entries {
180            let region_id = e.region_id();
181            let entry_id = e.entry_id();
182            match entry_ids.entry(region_id) {
183                hash_map::Entry::Occupied(mut o) => {
184                    let prev = *o.get();
185                    ensure!(
186                        entry_id == prev + 1,
187                        DiscontinuousLogIndexSnafu {
188                            region_id,
189                            last_index: prev,
190                            attempt_index: entry_id
191                        }
192                    );
193                    o.insert(entry_id);
194                }
195                hash_map::Entry::Vacant(v) => {
196                    // this entry is the first in batch of given region.
197                    if let Some(first_index) = self.engine.first_index(region_id.as_u64()) {
198                        // ensure the first in batch does not override compacted entry.
199                        ensure!(
200                            entry_id > first_index,
201                            OverrideCompactedEntrySnafu {
202                                namespace: region_id,
203                                first_index,
204                                attempt_index: entry_id,
205                            }
206                        );
207                    }
208                    // ensure the first in batch does not form a hole in raft-engine.
209                    if let Some(last_index) = self.engine.last_index(region_id.as_u64()) {
210                        ensure!(
211                            entry_id == last_index + 1,
212                            DiscontinuousLogIndexSnafu {
213                                region_id,
214                                last_index,
215                                attempt_index: entry_id
216                            }
217                        );
218                    }
219                    v.insert(entry_id);
220                }
221            }
222            batch
223                .add_entries::<MessageType>(
224                    region_id.as_u64(),
225                    &[EntryImpl {
226                        id: entry_id,
227                        namespace_id: region_id.as_u64(),
228                        data: e.into_bytes(),
229                        ..Default::default()
230                    }],
231                )
232                .context(AddEntryLogBatchSnafu)?;
233        }
234
235        Ok((batch, entry_ids))
236    }
237}
238
239impl Debug for RaftEngineLogStore {
240    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
241        f.debug_struct("RaftEngineLogsStore")
242            .field("sync_write", &self.sync_write)
243            .field("sync_period", &self.sync_period)
244            .field("read_batch_size", &self.read_batch_size)
245            .field("started", &self.gc_task.started())
246            .finish()
247    }
248}
249
250#[async_trait::async_trait]
251impl LogStore for RaftEngineLogStore {
252    type Error = Error;
253
254    async fn stop(&self) -> Result<()> {
255        self.gc_task
256            .stop()
257            .await
258            .context(StopWalTaskSnafu { name: "gc_task" })?;
259        self.sync_task
260            .stop()
261            .await
262            .context(StopWalTaskSnafu { name: "sync_task" })
263    }
264
265    /// Appends a batch of entries to logstore. `RaftEngineLogStore` assures the atomicity of
266    /// batch append.
267    async fn append_batch(&self, entries: Vec<Entry>) -> Result<AppendBatchResponse> {
268        metrics::METRIC_RAFT_ENGINE_APPEND_BATCH_BYTES_TOTAL.inc_by(
269            entries
270                .iter()
271                .map(|entry| entry.estimated_size())
272                .sum::<usize>() as u64,
273        );
274        let _timer = metrics::METRIC_RAFT_ENGINE_APPEND_BATCH_ELAPSED.start_timer();
275
276        ensure!(self.started(), IllegalStateSnafu);
277        if entries.is_empty() {
278            return Ok(AppendBatchResponse::default());
279        }
280
281        let (mut batch, last_entry_ids) = self.entries_to_batch(entries)?;
282        let _ = self
283            .engine
284            .write(&mut batch, self.sync_write)
285            .context(RaftEngineSnafu)?;
286
287        Ok(AppendBatchResponse { last_entry_ids })
288    }
289
290    /// Create a stream of entries from logstore in the given namespace. The end of stream is
291    /// determined by the current "last index" of the namespace.
292    async fn read(
293        &self,
294        provider: &Provider,
295        entry_id: EntryId,
296        _index: Option<WalIndex>,
297    ) -> Result<SendableEntryStream<'static, Entry, Self::Error>> {
298        let ns = provider
299            .as_raft_engine_provider()
300            .with_context(|| InvalidProviderSnafu {
301                expected: RaftEngineProvider::type_name(),
302                actual: provider.type_name(),
303            })?;
304        let namespace_id = ns.id;
305        let _timer = metrics::METRIC_RAFT_ENGINE_READ_ELAPSED.start_timer();
306
307        ensure!(self.started(), IllegalStateSnafu);
308        let engine = self.engine.clone();
309
310        let last_index = engine.last_index(namespace_id).unwrap_or(0);
311        let mut start_index =
312            entry_id.max(engine.first_index(namespace_id).unwrap_or(last_index + 1));
313
314        info!(
315            "Read logstore, namespace: {}, start: {}, span: {:?}",
316            namespace_id,
317            entry_id,
318            self.span(ns)
319        );
320        let max_batch_size = self.read_batch_size;
321        let (tx, mut rx) = tokio::sync::mpsc::channel(max_batch_size);
322        let _handle = common_runtime::spawn_global(async move {
323            while start_index <= last_index {
324                let mut vec = Vec::with_capacity(max_batch_size);
325                match engine
326                    .fetch_entries_to::<MessageType>(
327                        namespace_id,
328                        start_index,
329                        last_index + 1,
330                        Some(max_batch_size),
331                        &mut vec,
332                    )
333                    .context(FetchEntrySnafu {
334                        ns: namespace_id,
335                        start: start_index,
336                        end: last_index,
337                        max_size: max_batch_size,
338                    }) {
339                    Ok(_) => {
340                        if let Some(last_entry) = vec.last() {
341                            start_index = last_entry.id + 1;
342                        }
343                        // reader side closed, cancel following reads
344                        if tx.send(Ok(vec)).await.is_err() {
345                            break;
346                        }
347                    }
348                    Err(e) => {
349                        let _ = tx.send(Err(e)).await;
350                        break;
351                    }
352                }
353            }
354        });
355
356        let s = stream!({
357            while let Some(res) = rx.recv().await {
358                let res = res?;
359
360                yield Ok(res.into_iter().map(Entry::from).collect::<Vec<_>>());
361            }
362        });
363        Ok(Box::pin(s))
364    }
365
366    async fn create_namespace(&self, ns: &Provider) -> Result<()> {
367        let ns = ns
368            .as_raft_engine_provider()
369            .with_context(|| InvalidProviderSnafu {
370                expected: RaftEngineProvider::type_name(),
371                actual: ns.type_name(),
372            })?;
373        let namespace_id = ns.id;
374        ensure!(
375            namespace_id != SYSTEM_NAMESPACE,
376            IllegalNamespaceSnafu { ns: namespace_id }
377        );
378        ensure!(self.started(), IllegalStateSnafu);
379        let key = format!("{}{}", NAMESPACE_PREFIX, namespace_id)
380            .as_bytes()
381            .to_vec();
382        let mut batch = LogBatch::with_capacity(1);
383        batch
384            .put_message::<NamespaceImpl>(
385                SYSTEM_NAMESPACE,
386                key,
387                &NamespaceImpl {
388                    id: namespace_id,
389                    ..Default::default()
390                },
391            )
392            .context(RaftEngineSnafu)?;
393        let _ = self
394            .engine
395            .write(&mut batch, true)
396            .context(RaftEngineSnafu)?;
397        Ok(())
398    }
399
400    async fn delete_namespace(&self, ns: &Provider) -> Result<()> {
401        let ns = ns
402            .as_raft_engine_provider()
403            .with_context(|| InvalidProviderSnafu {
404                expected: RaftEngineProvider::type_name(),
405                actual: ns.type_name(),
406            })?;
407        let namespace_id = ns.id;
408        ensure!(
409            namespace_id != SYSTEM_NAMESPACE,
410            IllegalNamespaceSnafu { ns: namespace_id }
411        );
412        ensure!(self.started(), IllegalStateSnafu);
413        let key = format!("{}{}", NAMESPACE_PREFIX, namespace_id)
414            .as_bytes()
415            .to_vec();
416        let mut batch = LogBatch::with_capacity(1);
417        batch.delete(SYSTEM_NAMESPACE, key);
418        let _ = self
419            .engine
420            .write(&mut batch, true)
421            .context(RaftEngineSnafu)?;
422        Ok(())
423    }
424
425    async fn list_namespaces(&self) -> Result<Vec<Provider>> {
426        ensure!(self.started(), IllegalStateSnafu);
427        let mut namespaces: Vec<Provider> = vec![];
428        self.engine
429            .scan_messages::<NamespaceImpl, _>(
430                SYSTEM_NAMESPACE,
431                Some(NAMESPACE_PREFIX.as_bytes()),
432                None,
433                false,
434                |_, v| {
435                    namespaces.push(Provider::RaftEngine(RaftEngineProvider { id: v.id }));
436                    true
437                },
438            )
439            .context(RaftEngineSnafu)?;
440        Ok(namespaces)
441    }
442
443    fn entry(
444        &self,
445        data: &mut Vec<u8>,
446        entry_id: EntryId,
447        region_id: RegionId,
448        provider: &Provider,
449    ) -> Result<Entry> {
450        debug_assert_eq!(
451            provider.as_raft_engine_provider().unwrap().id,
452            region_id.as_u64()
453        );
454        Ok(Entry::Naive(NaiveEntry {
455            provider: provider.clone(),
456            region_id,
457            entry_id,
458            data: std::mem::take(data),
459        }))
460    }
461
462    async fn obsolete(
463        &self,
464        provider: &Provider,
465        _region_id: RegionId,
466        entry_id: EntryId,
467    ) -> Result<()> {
468        let ns = provider
469            .as_raft_engine_provider()
470            .with_context(|| InvalidProviderSnafu {
471                expected: RaftEngineProvider::type_name(),
472                actual: provider.type_name(),
473            })?;
474        let namespace_id = ns.id;
475        ensure!(self.started(), IllegalStateSnafu);
476        let obsoleted = self.engine.compact_to(namespace_id, entry_id + 1);
477        info!(
478            "Namespace {} obsoleted {} entries, compacted index: {}, span: {:?}",
479            namespace_id,
480            obsoleted,
481            entry_id,
482            self.span(ns)
483        );
484        Ok(())
485    }
486
487    fn high_watermark(&self, provider: &Provider) -> Result<EntryId> {
488        let ns = provider
489            .as_raft_engine_provider()
490            .with_context(|| InvalidProviderSnafu {
491                expected: RaftEngineProvider::type_name(),
492                actual: provider.type_name(),
493            })?;
494        let namespace_id = ns.id;
495        let last_index = self.engine.last_index(namespace_id).unwrap_or(0);
496        Ok(last_index)
497    }
498}
499
500#[derive(Debug, Clone)]
501struct MessageType;
502
503impl MessageExt for MessageType {
504    type Entry = EntryImpl;
505
506    fn index(e: &Self::Entry) -> u64 {
507        e.id
508    }
509}
510
511#[cfg(test)]
512impl RaftEngineLogStore {
513    /// Appends a batch of entries and returns a response containing a map where the key is a region id
514    /// while the value is the id of the last successfully written entry of the region.
515    async fn append(&self, entry: Entry) -> Result<store_api::logstore::AppendResponse> {
516        let response = self.append_batch(vec![entry]).await?;
517        if let Some((_, last_entry_id)) = response.last_entry_ids.into_iter().next() {
518            return Ok(store_api::logstore::AppendResponse { last_entry_id });
519        }
520        unreachable!()
521    }
522}
523
524#[cfg(test)]
525mod tests {
526    use std::collections::HashSet;
527    use std::time::Duration;
528
529    use common_base::readable_size::ReadableSize;
530    use common_telemetry::debug;
531    use common_test_util::temp_dir::{create_temp_dir, TempDir};
532    use futures_util::StreamExt;
533    use store_api::logstore::{LogStore, SendableEntryStream};
534
535    use super::*;
536    use crate::error::Error;
537    use crate::raft_engine::log_store::RaftEngineLogStore;
538    use crate::raft_engine::protos::logstore::EntryImpl;
539
540    #[tokio::test]
541    async fn test_open_logstore() {
542        let dir = create_temp_dir("raft-engine-logstore-test");
543        let logstore = RaftEngineLogStore::try_new(
544            dir.path().to_str().unwrap().to_string(),
545            &RaftEngineConfig::default(),
546        )
547        .await
548        .unwrap();
549        let namespaces = logstore.list_namespaces().await.unwrap();
550        assert_eq!(0, namespaces.len());
551    }
552
553    #[tokio::test]
554    async fn test_manage_namespace() {
555        let dir = create_temp_dir("raft-engine-logstore-test");
556        let logstore = RaftEngineLogStore::try_new(
557            dir.path().to_str().unwrap().to_string(),
558            &RaftEngineConfig::default(),
559        )
560        .await
561        .unwrap();
562        assert!(logstore.list_namespaces().await.unwrap().is_empty());
563
564        logstore
565            .create_namespace(&Provider::raft_engine_provider(42))
566            .await
567            .unwrap();
568        let namespaces = logstore.list_namespaces().await.unwrap();
569        assert_eq!(1, namespaces.len());
570        assert_eq!(Provider::raft_engine_provider(42), namespaces[0]);
571
572        logstore
573            .delete_namespace(&Provider::raft_engine_provider(42))
574            .await
575            .unwrap();
576        assert!(logstore.list_namespaces().await.unwrap().is_empty());
577    }
578
579    #[tokio::test]
580    async fn test_append_and_read() {
581        let dir = create_temp_dir("raft-engine-logstore-test");
582        let logstore = RaftEngineLogStore::try_new(
583            dir.path().to_str().unwrap().to_string(),
584            &RaftEngineConfig::default(),
585        )
586        .await
587        .unwrap();
588
589        let namespace_id = 1;
590        let cnt = 1024;
591        for i in 0..cnt {
592            let response = logstore
593                .append(
594                    EntryImpl::create(i, namespace_id, i.to_string().as_bytes().to_vec()).into(),
595                )
596                .await
597                .unwrap();
598            assert_eq!(i, response.last_entry_id);
599        }
600        let mut entries = HashSet::with_capacity(1024);
601        let mut s = logstore
602            .read(&Provider::raft_engine_provider(1), 0, None)
603            .await
604            .unwrap();
605        while let Some(r) = s.next().await {
606            let vec = r.unwrap();
607            entries.extend(vec.into_iter().map(|e| e.entry_id()));
608        }
609        assert_eq!((0..cnt).collect::<HashSet<_>>(), entries);
610    }
611
612    async fn collect_entries(mut s: SendableEntryStream<'_, Entry, Error>) -> Vec<Entry> {
613        let mut res = vec![];
614        while let Some(r) = s.next().await {
615            res.extend(r.unwrap());
616        }
617        res
618    }
619
620    #[tokio::test]
621    async fn test_reopen() {
622        let dir = create_temp_dir("raft-engine-logstore-reopen-test");
623        {
624            let logstore = RaftEngineLogStore::try_new(
625                dir.path().to_str().unwrap().to_string(),
626                &RaftEngineConfig::default(),
627            )
628            .await
629            .unwrap();
630            assert!(logstore
631                .append(EntryImpl::create(1, 1, "1".as_bytes().to_vec()).into())
632                .await
633                .is_ok());
634            let entries = logstore
635                .read(&Provider::raft_engine_provider(1), 1, None)
636                .await
637                .unwrap()
638                .collect::<Vec<_>>()
639                .await;
640            assert_eq!(1, entries.len());
641            logstore.stop().await.unwrap();
642        }
643
644        let logstore = RaftEngineLogStore::try_new(
645            dir.path().to_str().unwrap().to_string(),
646            &RaftEngineConfig::default(),
647        )
648        .await
649        .unwrap();
650
651        let entries = collect_entries(
652            logstore
653                .read(&Provider::raft_engine_provider(1), 1, None)
654                .await
655                .unwrap(),
656        )
657        .await;
658        assert_eq!(1, entries.len());
659        assert_eq!(1, entries[0].entry_id());
660        assert_eq!(1, entries[0].region_id().as_u64());
661    }
662
663    async fn wal_dir_usage(path: impl AsRef<str>) -> usize {
664        let mut size: usize = 0;
665        let mut read_dir = tokio::fs::read_dir(path.as_ref()).await.unwrap();
666        while let Ok(dir_entry) = read_dir.next_entry().await {
667            let Some(entry) = dir_entry else {
668                break;
669            };
670            if entry.file_type().await.unwrap().is_file() {
671                let file_name = entry.file_name();
672                let file_size = entry.metadata().await.unwrap().len() as usize;
673                debug!("File: {file_name:?}, size: {file_size}");
674                size += file_size;
675            }
676        }
677        size
678    }
679
680    async fn new_test_log_store(dir: &TempDir) -> RaftEngineLogStore {
681        let path = dir.path().to_str().unwrap().to_string();
682
683        let config = RaftEngineConfig {
684            file_size: ReadableSize::mb(2),
685            purge_threshold: ReadableSize::mb(4),
686            purge_interval: Duration::from_secs(5),
687            ..Default::default()
688        };
689
690        RaftEngineLogStore::try_new(path, &config).await.unwrap()
691    }
692
693    #[tokio::test]
694    async fn test_compaction() {
695        common_telemetry::init_default_ut_logging();
696        let dir = create_temp_dir("raft-engine-logstore-test");
697        let logstore = new_test_log_store(&dir).await;
698
699        let region_id = RegionId::new(1, 1);
700        let namespace_id = region_id.as_u64();
701        let namespace = Provider::raft_engine_provider(namespace_id);
702        for id in 0..4096 {
703            let entry = EntryImpl::create(id, namespace_id, [b'x'; 4096].to_vec()).into();
704            let _ = logstore.append(entry).await.unwrap();
705        }
706
707        let before_purge = wal_dir_usage(dir.path().to_str().unwrap()).await;
708        logstore
709            .obsolete(&namespace, region_id, 4000)
710            .await
711            .unwrap();
712
713        tokio::time::sleep(Duration::from_secs(6)).await;
714        let after_purge = wal_dir_usage(dir.path().to_str().unwrap()).await;
715        debug!(
716            "Before purge: {}, after purge: {}",
717            before_purge, after_purge
718        );
719        assert!(before_purge > after_purge);
720    }
721
722    #[tokio::test]
723    async fn test_obsolete() {
724        common_telemetry::init_default_ut_logging();
725        let dir = create_temp_dir("raft-engine-logstore-test");
726        let logstore = new_test_log_store(&dir).await;
727
728        let region_id = RegionId::new(1, 1);
729        let namespace_id = region_id.as_u64();
730        let namespace = Provider::raft_engine_provider(namespace_id);
731        for id in 0..1024 {
732            let entry = EntryImpl::create(id, namespace_id, [b'x'; 4096].to_vec()).into();
733            let _ = logstore.append(entry).await.unwrap();
734        }
735
736        logstore.obsolete(&namespace, region_id, 100).await.unwrap();
737        assert_eq!(101, logstore.engine.first_index(namespace_id).unwrap());
738
739        let res = logstore.read(&namespace, 100, None).await.unwrap();
740        let mut vec = collect_entries(res).await;
741        vec.sort_by(|a, b| a.entry_id().partial_cmp(&b.entry_id()).unwrap());
742        assert_eq!(101, vec.first().unwrap().entry_id());
743    }
744
745    #[tokio::test]
746    async fn test_append_batch() {
747        common_telemetry::init_default_ut_logging();
748        let dir = create_temp_dir("logstore-append-batch-test");
749        let logstore = new_test_log_store(&dir).await;
750
751        let entries = (0..8)
752            .flat_map(|ns_id| {
753                let data = [ns_id as u8].repeat(4096);
754                (0..16).map(move |idx| EntryImpl::create(idx, ns_id, data.clone()).into())
755            })
756            .collect();
757
758        logstore.append_batch(entries).await.unwrap();
759        for ns_id in 0..8 {
760            let namespace = &RaftEngineProvider::new(ns_id);
761            let (first, last) = logstore.span(namespace);
762            assert_eq!(0, first.unwrap());
763            assert_eq!(15, last.unwrap());
764        }
765    }
766
767    #[tokio::test]
768    async fn test_append_batch_interleaved() {
769        common_telemetry::init_default_ut_logging();
770        let dir = create_temp_dir("logstore-append-batch-test");
771        let logstore = new_test_log_store(&dir).await;
772        let entries = vec![
773            EntryImpl::create(0, 0, [b'0'; 4096].to_vec()).into(),
774            EntryImpl::create(1, 0, [b'0'; 4096].to_vec()).into(),
775            EntryImpl::create(0, 1, [b'1'; 4096].to_vec()).into(),
776            EntryImpl::create(2, 0, [b'0'; 4096].to_vec()).into(),
777            EntryImpl::create(1, 1, [b'1'; 4096].to_vec()).into(),
778        ];
779
780        logstore.append_batch(entries).await.unwrap();
781
782        assert_eq!(
783            (Some(0), Some(2)),
784            logstore.span(&RaftEngineProvider::new(0))
785        );
786        assert_eq!(
787            (Some(0), Some(1)),
788            logstore.span(&RaftEngineProvider::new(1))
789        );
790    }
791
792    #[tokio::test]
793    async fn test_append_batch_response() {
794        common_telemetry::init_default_ut_logging();
795        let dir = create_temp_dir("logstore-append-batch-test");
796        let logstore = new_test_log_store(&dir).await;
797
798        let entries = vec![
799            // Entry[0] from region 0.
800            EntryImpl::create(0, 0, [b'0'; 4096].to_vec()).into(),
801            // Entry[0] from region 1.
802            EntryImpl::create(0, 1, [b'1'; 4096].to_vec()).into(),
803            // Entry[1] from region 1.
804            EntryImpl::create(1, 0, [b'1'; 4096].to_vec()).into(),
805            // Entry[1] from region 0.
806            EntryImpl::create(1, 1, [b'0'; 4096].to_vec()).into(),
807            // Entry[2] from region 2.
808            EntryImpl::create(2, 2, [b'2'; 4096].to_vec()).into(),
809        ];
810
811        // Ensure the last entry id returned for each region is the expected one.
812        let last_entry_ids = logstore.append_batch(entries).await.unwrap().last_entry_ids;
813        assert_eq!(last_entry_ids[&(0.into())], 1);
814        assert_eq!(last_entry_ids[&(1.into())], 1);
815        assert_eq!(last_entry_ids[&(2.into())], 2);
816    }
817}