log_store/kafka/
log_store.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18
19use common_base::readable_size::ReadableSize;
20use common_meta::datanode::TopicStatsReporter;
21use common_meta::distributed_time_constants::TOPIC_STATS_REPORT_INTERVAL_SECS;
22use common_telemetry::{debug, info, warn};
23use common_time::util::current_time_millis;
24use common_wal::config::kafka::DatanodeKafkaConfig;
25use dashmap::DashMap;
26use futures::future::try_join_all;
27use futures_util::StreamExt;
28use rskafka::client::partition::OffsetAt;
29use snafu::{OptionExt, ResultExt};
30use store_api::logstore::entry::{
31    Entry, Id as EntryId, MultiplePartEntry, MultiplePartHeader, NaiveEntry,
32};
33use store_api::logstore::provider::{KafkaProvider, Provider};
34use store_api::logstore::{AppendBatchResponse, LogStore, SendableEntryStream, WalIndex};
35use store_api::storage::RegionId;
36
37use crate::error::{self, ConsumeRecordSnafu, Error, GetOffsetSnafu, InvalidProviderSnafu, Result};
38use crate::kafka::client_manager::{ClientManager, ClientManagerRef};
39use crate::kafka::consumer::{ConsumerBuilder, RecordsBuffer};
40use crate::kafka::index::{
41    build_region_wal_index_iterator, GlobalIndexCollector, MIN_BATCH_WINDOW_SIZE,
42};
43use crate::kafka::periodic_offset_fetcher::PeriodicOffsetFetcher;
44use crate::kafka::producer::OrderedBatchProducerRef;
45use crate::kafka::util::record::{
46    convert_to_kafka_records, maybe_emit_entry, remaining_entries, Record, ESTIMATED_META_SIZE,
47};
48use crate::metrics;
49
50const DEFAULT_OFFSET_FETCH_INTERVAL: Duration = Duration::from_secs(60);
51
52/// Statistics for a topic.
53#[derive(Debug, Clone, Copy, Default)]
54pub struct TopicStat {
55    /// Latest offset for the topic.
56    ///
57    /// The latest offset is updated in two ways:
58    /// - Automatically when the producer successfully commits data to Kafka
59    /// - Periodically by the [PeriodicOffsetFetcher](crate::kafka::periodic_offset_fetcher::PeriodicOffsetFetcher).
60    ///
61    pub latest_offset: u64,
62    /// Total size in bytes of records appended to the topic.
63    pub record_size: u64,
64    /// Total number of records appended to the topic.
65    pub record_num: u64,
66}
67
68/// A log store backed by Kafka.
69#[derive(Debug)]
70pub struct KafkaLogStore {
71    /// The manager of topic clients.
72    client_manager: ClientManagerRef,
73    /// The max size of a batch.
74    max_batch_bytes: usize,
75    /// The consumer wait timeout.
76    consumer_wait_timeout: Duration,
77    /// Ignore missing entries during read WAL.
78    overwrite_entry_start_id: bool,
79    /// The stats of each topic.
80    ///
81    /// This shared map allows multiple components to access the latest stats
82    /// information without needing to query Kafka directly.
83    topic_stats: Arc<DashMap<Arc<KafkaProvider>, TopicStat>>,
84}
85
86struct PeriodicTopicStatsReporter {
87    topic_stats: Arc<DashMap<Arc<KafkaProvider>, TopicStat>>,
88    last_reported_timestamp_millis: i64,
89    report_interval_millis: i64,
90}
91
92impl PeriodicTopicStatsReporter {
93    fn align_ts(ts: i64, report_interval_millis: i64) -> i64 {
94        (ts / report_interval_millis) * report_interval_millis
95    }
96
97    /// Creates a new [PeriodicTopicStatsReporter].
98    ///
99    /// # Panics
100    ///
101    /// Panics if `report_interval` is zero.
102    fn new(
103        topic_stats: Arc<DashMap<Arc<KafkaProvider>, TopicStat>>,
104        report_interval: Duration,
105    ) -> Self {
106        assert!(!report_interval.is_zero());
107        let report_interval_millis = report_interval.as_millis() as i64;
108        let last_reported_timestamp_millis =
109            Self::align_ts(current_time_millis(), report_interval_millis);
110
111        Self {
112            topic_stats,
113            last_reported_timestamp_millis,
114            report_interval_millis,
115        }
116    }
117}
118
119impl TopicStatsReporter for PeriodicTopicStatsReporter {
120    fn reportable_topics(&mut self) -> Vec<common_meta::datanode::TopicStat> {
121        let now = Self::align_ts(current_time_millis(), self.report_interval_millis);
122        if now < self.last_reported_timestamp_millis + self.report_interval_millis {
123            debug!("Skip reporting topic stats because the interval is not reached");
124            return vec![];
125        }
126
127        self.last_reported_timestamp_millis = now;
128        let mut reportable_topics = Vec::with_capacity(self.topic_stats.len());
129        for e in self.topic_stats.iter() {
130            let topic_stat = e.value();
131            let topic_stat = common_meta::datanode::TopicStat {
132                topic: e.key().topic.clone(),
133                latest_entry_id: topic_stat.latest_offset,
134                record_size: topic_stat.record_size,
135                record_num: topic_stat.record_num,
136            };
137            debug!("Reportable topic: {:?}", topic_stat);
138            reportable_topics.push(topic_stat);
139        }
140        debug!("Reportable {} topics at {}", reportable_topics.len(), now);
141        reportable_topics
142    }
143}
144
145impl KafkaLogStore {
146    /// Tries to create a Kafka log store.
147    pub async fn try_new(
148        config: &DatanodeKafkaConfig,
149        global_index_collector: Option<GlobalIndexCollector>,
150    ) -> Result<Self> {
151        let topic_stats = Arc::new(DashMap::new());
152        let client_manager = Arc::new(
153            ClientManager::try_new(config, global_index_collector, topic_stats.clone()).await?,
154        );
155        let fetcher =
156            PeriodicOffsetFetcher::new(DEFAULT_OFFSET_FETCH_INTERVAL, client_manager.clone());
157        fetcher.run().await;
158
159        Ok(Self {
160            client_manager,
161            max_batch_bytes: config.max_batch_bytes.as_bytes() as usize,
162            consumer_wait_timeout: config.consumer_wait_timeout,
163            overwrite_entry_start_id: config.overwrite_entry_start_id,
164            topic_stats,
165        })
166    }
167
168    /// Returns the topic stats.
169    pub fn topic_stats_reporter(&self) -> Box<dyn TopicStatsReporter> {
170        Box::new(PeriodicTopicStatsReporter::new(
171            self.topic_stats.clone(),
172            Duration::from_secs(TOPIC_STATS_REPORT_INTERVAL_SECS),
173        ))
174    }
175}
176
177fn build_entry(
178    data: Vec<u8>,
179    entry_id: EntryId,
180    region_id: RegionId,
181    provider: &Provider,
182    max_data_size: usize,
183) -> Entry {
184    if data.len() <= max_data_size {
185        Entry::Naive(NaiveEntry {
186            provider: provider.clone(),
187            region_id,
188            entry_id,
189            data,
190        })
191    } else {
192        let parts = data
193            .chunks(max_data_size)
194            .map(|s| s.into())
195            .collect::<Vec<_>>();
196        let num_parts = parts.len();
197
198        let mut headers = Vec::with_capacity(num_parts);
199        headers.push(MultiplePartHeader::First);
200        headers.extend((1..num_parts - 1).map(MultiplePartHeader::Middle));
201        headers.push(MultiplePartHeader::Last);
202
203        Entry::MultiplePart(MultiplePartEntry {
204            provider: provider.clone(),
205            region_id,
206            entry_id,
207            headers,
208            parts,
209        })
210    }
211}
212
213#[async_trait::async_trait]
214impl LogStore for KafkaLogStore {
215    type Error = Error;
216
217    /// Creates an [Entry].
218    fn entry(
219        &self,
220        data: Vec<u8>,
221        entry_id: EntryId,
222        region_id: RegionId,
223        provider: &Provider,
224    ) -> Result<Entry> {
225        provider
226            .as_kafka_provider()
227            .with_context(|| InvalidProviderSnafu {
228                expected: KafkaProvider::type_name(),
229                actual: provider.type_name(),
230            })?;
231
232        let max_data_size = self.max_batch_bytes - ESTIMATED_META_SIZE;
233        Ok(build_entry(
234            data,
235            entry_id,
236            region_id,
237            provider,
238            max_data_size,
239        ))
240    }
241
242    /// Appends a batch of entries and returns a response containing a map where the key is a region id
243    /// while the value is the id of the last successfully written entry of the region.
244    async fn append_batch(&self, entries: Vec<Entry>) -> Result<AppendBatchResponse> {
245        metrics::METRIC_KAFKA_APPEND_BATCH_BYTES_TOTAL.inc_by(
246            entries
247                .iter()
248                .map(|entry| entry.estimated_size())
249                .sum::<usize>() as u64,
250        );
251        let _timer = metrics::METRIC_KAFKA_APPEND_BATCH_ELAPSED.start_timer();
252
253        if entries.is_empty() {
254            return Ok(AppendBatchResponse::default());
255        }
256
257        let region_ids = entries
258            .iter()
259            .map(|entry| entry.region_id())
260            .collect::<HashSet<_>>();
261        let mut region_grouped_records: HashMap<RegionId, (OrderedBatchProducerRef, Vec<_>)> =
262            HashMap::with_capacity(region_ids.len());
263        let mut region_to_provider = HashMap::with_capacity(region_ids.len());
264        for entry in entries {
265            let provider = entry.provider().as_kafka_provider().with_context(|| {
266                error::InvalidProviderSnafu {
267                    expected: KafkaProvider::type_name(),
268                    actual: entry.provider().type_name(),
269                }
270            })?;
271            region_to_provider.insert(entry.region_id(), provider.clone());
272            let region_id = entry.region_id();
273            match region_grouped_records.entry(region_id) {
274                std::collections::hash_map::Entry::Occupied(mut slot) => {
275                    slot.get_mut().1.extend(convert_to_kafka_records(entry)?);
276                }
277                std::collections::hash_map::Entry::Vacant(slot) => {
278                    let producer = self
279                        .client_manager
280                        .get_or_insert(provider)
281                        .await?
282                        .producer()
283                        .clone();
284
285                    slot.insert((producer, convert_to_kafka_records(entry)?));
286                }
287            }
288        }
289
290        let mut region_grouped_result_receivers = Vec::with_capacity(region_ids.len());
291        for (region_id, (producer, records)) in region_grouped_records {
292            // Safety: `KafkaLogStore::entry` will ensure that the
293            // `Record`'s `approximate_size` must be less or equal to `max_batch_bytes`.
294            region_grouped_result_receivers
295                .push((region_id, producer.produce(region_id, records).await?))
296        }
297
298        let region_grouped_max_offset =
299            try_join_all(region_grouped_result_receivers.into_iter().map(
300                |(region_id, receiver)| async move {
301                    receiver.wait().await.map(|offset| (region_id, offset))
302                },
303            ))
304            .await?;
305        debug!(
306            "Appended batch to Kafka, region_grouped_max_offset: {:?}",
307            region_grouped_max_offset
308        );
309
310        Ok(AppendBatchResponse {
311            last_entry_ids: region_grouped_max_offset.into_iter().collect(),
312        })
313    }
314
315    /// Creates a new `EntryStream` to asynchronously generates `Entry` with entry ids.
316    /// Returns entries belonging to `provider`, starting from `entry_id`.
317    async fn read(
318        &self,
319        provider: &Provider,
320        mut entry_id: EntryId,
321        index: Option<WalIndex>,
322    ) -> Result<SendableEntryStream<'static, Entry, Self::Error>> {
323        let provider = provider
324            .as_kafka_provider()
325            .with_context(|| InvalidProviderSnafu {
326                expected: KafkaProvider::type_name(),
327                actual: provider.type_name(),
328            })?;
329
330        let _timer = metrics::METRIC_KAFKA_READ_ELAPSED.start_timer();
331
332        // Gets the client associated with the topic.
333        let client = self
334            .client_manager
335            .get_or_insert(provider)
336            .await?
337            .client()
338            .clone();
339
340        if self.overwrite_entry_start_id {
341            let start_offset =
342                client
343                    .get_offset(OffsetAt::Earliest)
344                    .await
345                    .context(GetOffsetSnafu {
346                        topic: &provider.topic,
347                    })?;
348
349            if entry_id as i64 <= start_offset {
350                warn!(
351                "The entry_id: {} is less than start_offset: {}, topic: {}. Overwriting entry_id with start_offset",
352                entry_id, start_offset, &provider.topic
353            );
354
355                entry_id = start_offset as u64;
356            }
357        }
358
359        // Gets the offset of the latest record in the topic. Actually, it's the latest record of the single partition in the topic.
360        // The read operation terminates when this record is consumed.
361        // Warning: the `get_offset` returns the end offset of the latest record. For our usage, it should be decremented.
362        // See: https://kafka.apache.org/36/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#endOffsets(java.util.Collection)
363        let end_offset = client
364            .get_offset(OffsetAt::Latest)
365            .await
366            .context(GetOffsetSnafu {
367                topic: &provider.topic,
368            })?;
369        let latest_offset = (end_offset as u64).saturating_sub(1);
370        self.topic_stats
371            .entry(provider.clone())
372            .and_modify(|stat| {
373                stat.latest_offset = stat.latest_offset.max(latest_offset);
374            })
375            .or_insert_with(|| TopicStat {
376                latest_offset,
377                record_size: 0,
378                record_num: 0,
379            });
380
381        let region_indexes = if let (Some(index), Some(collector)) =
382            (index, self.client_manager.global_index_collector())
383        {
384            collector
385                .read_remote_region_index(index.location_id, provider, index.region_id, entry_id)
386                .await?
387        } else {
388            None
389        };
390
391        let Some(iterator) = build_region_wal_index_iterator(
392            entry_id,
393            end_offset as u64,
394            region_indexes,
395            self.max_batch_bytes,
396            MIN_BATCH_WINDOW_SIZE,
397        ) else {
398            let range = entry_id..end_offset as u64;
399            warn!("No new entries in range {:?} of ns {}", range, provider);
400            return Ok(futures_util::stream::empty().boxed());
401        };
402
403        debug!("Reading entries with {:?} of ns {}", iterator, provider);
404
405        // Safety: Must be ok.
406        let mut stream_consumer = ConsumerBuilder::default()
407            .client(client)
408            // Safety: checked before.
409            .buffer(RecordsBuffer::new(iterator))
410            .max_batch_size(self.max_batch_bytes)
411            .max_wait_ms(self.consumer_wait_timeout.as_millis() as u32)
412            .build()
413            .unwrap();
414
415        // A buffer is used to collect records to construct a complete entry.
416        let mut entry_records: HashMap<RegionId, Vec<Record>> = HashMap::new();
417        let provider = provider.clone();
418        let stream = async_stream::stream!({
419            let now = Instant::now();
420            while let Some(consume_result) = stream_consumer.next().await {
421                // Each next on the stream consumer produces a `RecordAndOffset` and a high watermark offset.
422                // The `RecordAndOffset` contains the record data and its start offset.
423                // The high watermark offset is the offset of the last record plus one.
424                let (record_and_offset, high_watermark) =
425                    consume_result.context(ConsumeRecordSnafu {
426                        topic: &provider.topic,
427                    })?;
428                let (kafka_record, offset) = (record_and_offset.record, record_and_offset.offset);
429
430                debug!(
431                    "Read a record at offset {} for topic {}, high watermark: {}",
432                    offset, provider.topic, high_watermark
433                );
434
435                // Ignores no-op records.
436                if kafka_record.value.is_none() {
437                    if check_termination(offset, end_offset) {
438                        if let Some(entries) = remaining_entries(&provider, &mut entry_records) {
439                            yield Ok(entries);
440                        }
441                        break;
442                    }
443                    continue;
444                }
445
446                let record = Record::try_from(kafka_record)?;
447                // Tries to construct an entry from records consumed so far.
448                if let Some(mut entry) = maybe_emit_entry(&provider, record, &mut entry_records)? {
449                    // We don't rely on the EntryId generated by mito2.
450                    // Instead, we use the offset return from Kafka as EntryId.
451                    // Therefore, we MUST overwrite the EntryId with RecordOffset.
452                    entry.set_entry_id(offset as u64);
453                    yield Ok(vec![entry]);
454                }
455
456                if check_termination(offset, end_offset) {
457                    if let Some(entries) = remaining_entries(&provider, &mut entry_records) {
458                        yield Ok(entries);
459                    }
460                    break;
461                }
462            }
463
464            metrics::METRIC_KAFKA_READ_BYTES_TOTAL.inc_by(stream_consumer.total_fetched_bytes());
465
466            info!(
467                "Fetched {} bytes from topic: {}, start_entry_id: {}, end_offset: {}, elapsed: {:?}",
468                ReadableSize(stream_consumer.total_fetched_bytes()),
469                stream_consumer.topic(),
470                entry_id,
471                end_offset,
472                now.elapsed()
473            );
474        });
475        Ok(Box::pin(stream))
476    }
477
478    /// Creates a new `Namespace` from the given ref.
479    async fn create_namespace(&self, _provider: &Provider) -> Result<()> {
480        Ok(())
481    }
482
483    /// Deletes an existing `Namespace` specified by the given ref.
484    async fn delete_namespace(&self, _provider: &Provider) -> Result<()> {
485        Ok(())
486    }
487
488    /// Lists all existing namespaces.
489    async fn list_namespaces(&self) -> Result<Vec<Provider>> {
490        Ok(vec![])
491    }
492
493    /// Marks all entries with ids `<=entry_id` of the given `namespace` as obsolete,
494    /// so that the log store can safely delete those entries. This method does not guarantee
495    /// that the obsolete entries are deleted immediately.
496    async fn obsolete(
497        &self,
498        provider: &Provider,
499        region_id: RegionId,
500        entry_id: EntryId,
501    ) -> Result<()> {
502        if let Some(collector) = self.client_manager.global_index_collector() {
503            let provider = provider
504                .as_kafka_provider()
505                .with_context(|| InvalidProviderSnafu {
506                    expected: KafkaProvider::type_name(),
507                    actual: provider.type_name(),
508                })?;
509            collector.truncate(provider, region_id, entry_id).await?;
510        }
511        Ok(())
512    }
513
514    /// Returns the highest entry id of the specified topic in remote WAL.
515    fn latest_entry_id(&self, provider: &Provider) -> Result<EntryId> {
516        let provider = provider
517            .as_kafka_provider()
518            .with_context(|| InvalidProviderSnafu {
519                expected: KafkaProvider::type_name(),
520                actual: provider.type_name(),
521            })?;
522
523        let stat = self
524            .topic_stats
525            .get(provider)
526            .as_deref()
527            .copied()
528            .unwrap_or_default();
529
530        Ok(stat.latest_offset)
531    }
532
533    /// Stops components of the logstore.
534    async fn stop(&self) -> Result<()> {
535        Ok(())
536    }
537}
538
539fn check_termination(offset: i64, end_offset: i64) -> bool {
540    // Terminates the stream if the entry with the end offset was read.
541    if offset >= end_offset {
542        debug!("Stream consumer terminates at offset {}", offset);
543        // There must have no records when the stream terminates.
544        true
545    } else {
546        false
547    }
548}
549
550#[cfg(test)]
551mod tests {
552
553    use std::assert_matches::assert_matches;
554    use std::collections::HashMap;
555    use std::sync::Arc;
556    use std::time::Duration;
557
558    use common_base::readable_size::ReadableSize;
559    use common_meta::datanode::TopicStatsReporter;
560    use common_telemetry::info;
561    use common_telemetry::tracing::warn;
562    use common_wal::config::kafka::common::KafkaConnectionConfig;
563    use common_wal::config::kafka::DatanodeKafkaConfig;
564    use dashmap::DashMap;
565    use futures::TryStreamExt;
566    use rand::prelude::SliceRandom;
567    use rand::Rng;
568    use rskafka::client::partition::OffsetAt;
569    use store_api::logstore::entry::{Entry, MultiplePartEntry, MultiplePartHeader, NaiveEntry};
570    use store_api::logstore::provider::Provider;
571    use store_api::logstore::LogStore;
572    use store_api::storage::RegionId;
573
574    use super::build_entry;
575    use crate::kafka::log_store::{KafkaLogStore, PeriodicTopicStatsReporter, TopicStat};
576
577    #[test]
578    fn test_build_naive_entry() {
579        let provider = Provider::kafka_provider("my_topic".to_string());
580        let region_id = RegionId::new(1, 1);
581        let entry = build_entry(vec![1; 100], 1, region_id, &provider, 120);
582
583        assert_eq!(
584            entry.into_naive_entry().unwrap(),
585            NaiveEntry {
586                provider,
587                region_id,
588                entry_id: 1,
589                data: vec![1; 100]
590            }
591        )
592    }
593
594    #[test]
595    fn test_build_into_multiple_part_entry() {
596        let provider = Provider::kafka_provider("my_topic".to_string());
597        let region_id = RegionId::new(1, 1);
598        let entry = build_entry(vec![1; 100], 1, region_id, &provider, 50);
599
600        assert_eq!(
601            entry.into_multiple_part_entry().unwrap(),
602            MultiplePartEntry {
603                provider: provider.clone(),
604                region_id,
605                entry_id: 1,
606                headers: vec![MultiplePartHeader::First, MultiplePartHeader::Last],
607                parts: vec![vec![1; 50], vec![1; 50]],
608            }
609        );
610
611        let region_id = RegionId::new(1, 1);
612        let entry = build_entry(vec![1; 100], 1, region_id, &provider, 21);
613
614        assert_eq!(
615            entry.into_multiple_part_entry().unwrap(),
616            MultiplePartEntry {
617                provider,
618                region_id,
619                entry_id: 1,
620                headers: vec![
621                    MultiplePartHeader::First,
622                    MultiplePartHeader::Middle(1),
623                    MultiplePartHeader::Middle(2),
624                    MultiplePartHeader::Middle(3),
625                    MultiplePartHeader::Last
626                ],
627                parts: vec![
628                    vec![1; 21],
629                    vec![1; 21],
630                    vec![1; 21],
631                    vec![1; 21],
632                    vec![1; 16]
633                ],
634            }
635        )
636    }
637
638    fn generate_entries(
639        logstore: &KafkaLogStore,
640        provider: &Provider,
641        num_entries: usize,
642        region_id: RegionId,
643        data_len: usize,
644    ) -> Vec<Entry> {
645        (0..num_entries)
646            .map(|_| {
647                let data: Vec<u8> = (0..data_len).map(|_| rand::random::<u8>()).collect();
648                // Always set `entry_id` to 0, the real entry_id will be set during the read.
649                logstore.entry(data, 0, region_id, provider).unwrap()
650            })
651            .collect()
652    }
653
654    async fn prepare_topic(logstore: &KafkaLogStore, topic_name: &str) {
655        let controller_client = logstore.client_manager.controller_client();
656        controller_client
657            .create_topic(topic_name.to_string(), 1, 1, 5000)
658            .await
659            .unwrap();
660    }
661
662    #[tokio::test]
663    async fn test_append_batch_basic() {
664        common_telemetry::init_default_ut_logging();
665        let Ok(broker_endpoints) = std::env::var("GT_KAFKA_ENDPOINTS") else {
666            warn!("The endpoints is empty, skipping the test 'test_append_batch_basic'");
667            return;
668        };
669        let broker_endpoints = broker_endpoints
670            .split(',')
671            .map(|s| s.trim().to_string())
672            .collect::<Vec<_>>();
673        let config = DatanodeKafkaConfig {
674            connection: KafkaConnectionConfig {
675                broker_endpoints,
676                ..Default::default()
677            },
678            max_batch_bytes: ReadableSize::kb(32),
679            ..Default::default()
680        };
681        let logstore = KafkaLogStore::try_new(&config, None).await.unwrap();
682        let topic_name = uuid::Uuid::new_v4().to_string();
683        prepare_topic(&logstore, &topic_name).await;
684        let provider = Provider::kafka_provider(topic_name);
685
686        let region_entries = (0..5)
687            .map(|i| {
688                let region_id = RegionId::new(1, i);
689                (
690                    region_id,
691                    generate_entries(&logstore, &provider, 20, region_id, 1024),
692                )
693            })
694            .collect::<HashMap<RegionId, Vec<_>>>();
695
696        let mut all_entries = region_entries
697            .values()
698            .flatten()
699            .cloned()
700            .collect::<Vec<_>>();
701        all_entries.shuffle(&mut rand::rng());
702
703        let response = logstore.append_batch(all_entries.clone()).await.unwrap();
704        // 5 region
705        assert_eq!(response.last_entry_ids.len(), 5);
706        let got_entries = logstore
707            .read(&provider, 0, None)
708            .await
709            .unwrap()
710            .try_collect::<Vec<_>>()
711            .await
712            .unwrap()
713            .into_iter()
714            .flatten()
715            .collect::<Vec<_>>();
716        for (region_id, _) in region_entries {
717            let expected_entries = all_entries
718                .iter()
719                .filter(|entry| entry.region_id() == region_id)
720                .cloned()
721                .collect::<Vec<_>>();
722            let mut actual_entries = got_entries
723                .iter()
724                .filter(|entry| entry.region_id() == region_id)
725                .cloned()
726                .collect::<Vec<_>>();
727            actual_entries
728                .iter_mut()
729                .for_each(|entry| entry.set_entry_id(0));
730            assert_eq!(expected_entries, actual_entries);
731        }
732        let latest_entry_id = logstore.latest_entry_id(&provider).unwrap();
733        let client = logstore
734            .client_manager
735            .get_or_insert(provider.as_kafka_provider().unwrap())
736            .await
737            .unwrap();
738        assert_eq!(latest_entry_id, 99);
739        // The latest offset is the offset of the last record plus one.
740        let latest = client.client().get_offset(OffsetAt::Latest).await.unwrap();
741        assert_eq!(latest, 100);
742    }
743
744    #[tokio::test]
745    async fn test_append_batch_basic_large() {
746        common_telemetry::init_default_ut_logging();
747        let Ok(broker_endpoints) = std::env::var("GT_KAFKA_ENDPOINTS") else {
748            warn!("The endpoints is empty, skipping the test 'test_append_batch_basic_large'");
749            return;
750        };
751        let data_size_kb = rand::rng().random_range(9..31usize);
752        info!("Entry size: {}Ki", data_size_kb);
753        let broker_endpoints = broker_endpoints
754            .split(',')
755            .map(|s| s.trim().to_string())
756            .collect::<Vec<_>>();
757        let config = DatanodeKafkaConfig {
758            connection: KafkaConnectionConfig {
759                broker_endpoints,
760                ..Default::default()
761            },
762            max_batch_bytes: ReadableSize::kb(8),
763            ..Default::default()
764        };
765        let logstore = KafkaLogStore::try_new(&config, None).await.unwrap();
766        let topic_name = uuid::Uuid::new_v4().to_string();
767        prepare_topic(&logstore, &topic_name).await;
768        let provider = Provider::kafka_provider(topic_name);
769        let region_entries = (0..5)
770            .map(|i| {
771                let region_id = RegionId::new(1, i);
772                (
773                    region_id,
774                    generate_entries(&logstore, &provider, 20, region_id, data_size_kb * 1024),
775                )
776            })
777            .collect::<HashMap<RegionId, Vec<_>>>();
778
779        let mut all_entries = region_entries
780            .values()
781            .flatten()
782            .cloned()
783            .collect::<Vec<_>>();
784        assert_matches!(all_entries[0], Entry::MultiplePart(_));
785        all_entries.shuffle(&mut rand::rng());
786
787        let response = logstore.append_batch(all_entries.clone()).await.unwrap();
788        // 5 region
789        assert_eq!(response.last_entry_ids.len(), 5);
790        let got_entries = logstore
791            .read(&provider, 0, None)
792            .await
793            .unwrap()
794            .try_collect::<Vec<_>>()
795            .await
796            .unwrap()
797            .into_iter()
798            .flatten()
799            .collect::<Vec<_>>();
800        for (region_id, _) in region_entries {
801            let expected_entries = all_entries
802                .iter()
803                .filter(|entry| entry.region_id() == region_id)
804                .cloned()
805                .collect::<Vec<_>>();
806            let mut actual_entries = got_entries
807                .iter()
808                .filter(|entry| entry.region_id() == region_id)
809                .cloned()
810                .collect::<Vec<_>>();
811            actual_entries
812                .iter_mut()
813                .for_each(|entry| entry.set_entry_id(0));
814            assert_eq!(expected_entries, actual_entries);
815        }
816        let high_wathermark = logstore.latest_entry_id(&provider).unwrap();
817        assert_eq!(high_wathermark, (data_size_kb as u64 / 8 + 1) * 20 * 5 - 1);
818    }
819
820    #[tokio::test]
821    async fn test_topic_stats_reporter() {
822        common_telemetry::init_default_ut_logging();
823        let topic_stats = Arc::new(DashMap::new());
824        let provider = Provider::kafka_provider("my_topic".to_string());
825        topic_stats.insert(
826            provider.as_kafka_provider().unwrap().clone(),
827            TopicStat {
828                latest_offset: 0,
829                record_size: 0,
830                record_num: 0,
831            },
832        );
833        let mut reporter = PeriodicTopicStatsReporter::new(topic_stats, Duration::from_secs(1));
834        // The first reportable topics should be empty.
835        let reportable_topics = reporter.reportable_topics();
836        assert_eq!(reportable_topics.len(), 0);
837
838        // After 1 second, the reportable topics should be the topic in the topic_stats.
839        tokio::time::sleep(Duration::from_secs(1)).await;
840        let reportable_topics = reporter.reportable_topics();
841        assert_eq!(reportable_topics.len(), 1);
842
843        // Call it immediately, should be empty.
844        let reportable_topics = reporter.reportable_topics();
845        assert_eq!(reportable_topics.len(), 0);
846    }
847}