1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18
19use common_base::readable_size::ReadableSize;
20use common_meta::datanode::TopicStatsReporter;
21use common_meta::distributed_time_constants::TOPIC_STATS_REPORT_INTERVAL_SECS;
22use common_telemetry::{debug, info, warn};
23use common_time::util::current_time_millis;
24use common_wal::config::kafka::DatanodeKafkaConfig;
25use dashmap::DashMap;
26use futures::future::try_join_all;
27use futures_util::StreamExt;
28use rskafka::client::partition::OffsetAt;
29use snafu::{OptionExt, ResultExt};
30use store_api::logstore::entry::{
31 Entry, Id as EntryId, MultiplePartEntry, MultiplePartHeader, NaiveEntry,
32};
33use store_api::logstore::provider::{KafkaProvider, Provider};
34use store_api::logstore::{AppendBatchResponse, LogStore, SendableEntryStream, WalIndex};
35use store_api::storage::RegionId;
36
37use crate::error::{self, ConsumeRecordSnafu, Error, GetOffsetSnafu, InvalidProviderSnafu, Result};
38use crate::kafka::client_manager::{ClientManager, ClientManagerRef};
39use crate::kafka::consumer::{ConsumerBuilder, RecordsBuffer};
40use crate::kafka::index::{
41 build_region_wal_index_iterator, GlobalIndexCollector, MIN_BATCH_WINDOW_SIZE,
42};
43use crate::kafka::periodic_offset_fetcher::PeriodicOffsetFetcher;
44use crate::kafka::producer::OrderedBatchProducerRef;
45use crate::kafka::util::record::{
46 convert_to_kafka_records, maybe_emit_entry, remaining_entries, Record, ESTIMATED_META_SIZE,
47};
48use crate::metrics;
49
50const DEFAULT_OFFSET_FETCH_INTERVAL: Duration = Duration::from_secs(60);
51
52#[derive(Debug, Clone, Copy, Default)]
54pub struct TopicStat {
55 pub latest_offset: u64,
62 pub record_size: u64,
64 pub record_num: u64,
66}
67
68#[derive(Debug)]
70pub struct KafkaLogStore {
71 client_manager: ClientManagerRef,
73 max_batch_bytes: usize,
75 consumer_wait_timeout: Duration,
77 overwrite_entry_start_id: bool,
79 topic_stats: Arc<DashMap<Arc<KafkaProvider>, TopicStat>>,
84}
85
86struct PeriodicTopicStatsReporter {
87 topic_stats: Arc<DashMap<Arc<KafkaProvider>, TopicStat>>,
88 last_reported_timestamp_millis: i64,
89 report_interval_millis: i64,
90}
91
92impl PeriodicTopicStatsReporter {
93 fn align_ts(ts: i64, report_interval_millis: i64) -> i64 {
94 (ts / report_interval_millis) * report_interval_millis
95 }
96
97 fn new(
103 topic_stats: Arc<DashMap<Arc<KafkaProvider>, TopicStat>>,
104 report_interval: Duration,
105 ) -> Self {
106 assert!(!report_interval.is_zero());
107 let report_interval_millis = report_interval.as_millis() as i64;
108 let last_reported_timestamp_millis =
109 Self::align_ts(current_time_millis(), report_interval_millis);
110
111 Self {
112 topic_stats,
113 last_reported_timestamp_millis,
114 report_interval_millis,
115 }
116 }
117}
118
119impl TopicStatsReporter for PeriodicTopicStatsReporter {
120 fn reportable_topics(&mut self) -> Vec<common_meta::datanode::TopicStat> {
121 let now = Self::align_ts(current_time_millis(), self.report_interval_millis);
122 if now < self.last_reported_timestamp_millis + self.report_interval_millis {
123 debug!("Skip reporting topic stats because the interval is not reached");
124 return vec![];
125 }
126
127 self.last_reported_timestamp_millis = now;
128 let mut reportable_topics = Vec::with_capacity(self.topic_stats.len());
129 for e in self.topic_stats.iter() {
130 let topic_stat = e.value();
131 let topic_stat = common_meta::datanode::TopicStat {
132 topic: e.key().topic.clone(),
133 latest_entry_id: topic_stat.latest_offset,
134 record_size: topic_stat.record_size,
135 record_num: topic_stat.record_num,
136 };
137 debug!("Reportable topic: {:?}", topic_stat);
138 reportable_topics.push(topic_stat);
139 }
140 debug!("Reportable {} topics at {}", reportable_topics.len(), now);
141 reportable_topics
142 }
143}
144
145impl KafkaLogStore {
146 pub async fn try_new(
148 config: &DatanodeKafkaConfig,
149 global_index_collector: Option<GlobalIndexCollector>,
150 ) -> Result<Self> {
151 let topic_stats = Arc::new(DashMap::new());
152 let client_manager = Arc::new(
153 ClientManager::try_new(config, global_index_collector, topic_stats.clone()).await?,
154 );
155 let fetcher =
156 PeriodicOffsetFetcher::new(DEFAULT_OFFSET_FETCH_INTERVAL, client_manager.clone());
157 fetcher.run().await;
158
159 Ok(Self {
160 client_manager,
161 max_batch_bytes: config.max_batch_bytes.as_bytes() as usize,
162 consumer_wait_timeout: config.consumer_wait_timeout,
163 overwrite_entry_start_id: config.overwrite_entry_start_id,
164 topic_stats,
165 })
166 }
167
168 pub fn topic_stats_reporter(&self) -> Box<dyn TopicStatsReporter> {
170 Box::new(PeriodicTopicStatsReporter::new(
171 self.topic_stats.clone(),
172 Duration::from_secs(TOPIC_STATS_REPORT_INTERVAL_SECS),
173 ))
174 }
175}
176
177fn build_entry(
178 data: Vec<u8>,
179 entry_id: EntryId,
180 region_id: RegionId,
181 provider: &Provider,
182 max_data_size: usize,
183) -> Entry {
184 if data.len() <= max_data_size {
185 Entry::Naive(NaiveEntry {
186 provider: provider.clone(),
187 region_id,
188 entry_id,
189 data,
190 })
191 } else {
192 let parts = data
193 .chunks(max_data_size)
194 .map(|s| s.into())
195 .collect::<Vec<_>>();
196 let num_parts = parts.len();
197
198 let mut headers = Vec::with_capacity(num_parts);
199 headers.push(MultiplePartHeader::First);
200 headers.extend((1..num_parts - 1).map(MultiplePartHeader::Middle));
201 headers.push(MultiplePartHeader::Last);
202
203 Entry::MultiplePart(MultiplePartEntry {
204 provider: provider.clone(),
205 region_id,
206 entry_id,
207 headers,
208 parts,
209 })
210 }
211}
212
213#[async_trait::async_trait]
214impl LogStore for KafkaLogStore {
215 type Error = Error;
216
217 fn entry(
219 &self,
220 data: Vec<u8>,
221 entry_id: EntryId,
222 region_id: RegionId,
223 provider: &Provider,
224 ) -> Result<Entry> {
225 provider
226 .as_kafka_provider()
227 .with_context(|| InvalidProviderSnafu {
228 expected: KafkaProvider::type_name(),
229 actual: provider.type_name(),
230 })?;
231
232 let max_data_size = self.max_batch_bytes - ESTIMATED_META_SIZE;
233 Ok(build_entry(
234 data,
235 entry_id,
236 region_id,
237 provider,
238 max_data_size,
239 ))
240 }
241
242 async fn append_batch(&self, entries: Vec<Entry>) -> Result<AppendBatchResponse> {
245 metrics::METRIC_KAFKA_APPEND_BATCH_BYTES_TOTAL.inc_by(
246 entries
247 .iter()
248 .map(|entry| entry.estimated_size())
249 .sum::<usize>() as u64,
250 );
251 let _timer = metrics::METRIC_KAFKA_APPEND_BATCH_ELAPSED.start_timer();
252
253 if entries.is_empty() {
254 return Ok(AppendBatchResponse::default());
255 }
256
257 let region_ids = entries
258 .iter()
259 .map(|entry| entry.region_id())
260 .collect::<HashSet<_>>();
261 let mut region_grouped_records: HashMap<RegionId, (OrderedBatchProducerRef, Vec<_>)> =
262 HashMap::with_capacity(region_ids.len());
263 let mut region_to_provider = HashMap::with_capacity(region_ids.len());
264 for entry in entries {
265 let provider = entry.provider().as_kafka_provider().with_context(|| {
266 error::InvalidProviderSnafu {
267 expected: KafkaProvider::type_name(),
268 actual: entry.provider().type_name(),
269 }
270 })?;
271 region_to_provider.insert(entry.region_id(), provider.clone());
272 let region_id = entry.region_id();
273 match region_grouped_records.entry(region_id) {
274 std::collections::hash_map::Entry::Occupied(mut slot) => {
275 slot.get_mut().1.extend(convert_to_kafka_records(entry)?);
276 }
277 std::collections::hash_map::Entry::Vacant(slot) => {
278 let producer = self
279 .client_manager
280 .get_or_insert(provider)
281 .await?
282 .producer()
283 .clone();
284
285 slot.insert((producer, convert_to_kafka_records(entry)?));
286 }
287 }
288 }
289
290 let mut region_grouped_result_receivers = Vec::with_capacity(region_ids.len());
291 for (region_id, (producer, records)) in region_grouped_records {
292 region_grouped_result_receivers
295 .push((region_id, producer.produce(region_id, records).await?))
296 }
297
298 let region_grouped_max_offset =
299 try_join_all(region_grouped_result_receivers.into_iter().map(
300 |(region_id, receiver)| async move {
301 receiver.wait().await.map(|offset| (region_id, offset))
302 },
303 ))
304 .await?;
305 debug!(
306 "Appended batch to Kafka, region_grouped_max_offset: {:?}",
307 region_grouped_max_offset
308 );
309
310 Ok(AppendBatchResponse {
311 last_entry_ids: region_grouped_max_offset.into_iter().collect(),
312 })
313 }
314
315 async fn read(
318 &self,
319 provider: &Provider,
320 mut entry_id: EntryId,
321 index: Option<WalIndex>,
322 ) -> Result<SendableEntryStream<'static, Entry, Self::Error>> {
323 let provider = provider
324 .as_kafka_provider()
325 .with_context(|| InvalidProviderSnafu {
326 expected: KafkaProvider::type_name(),
327 actual: provider.type_name(),
328 })?;
329
330 let _timer = metrics::METRIC_KAFKA_READ_ELAPSED.start_timer();
331
332 let client = self
334 .client_manager
335 .get_or_insert(provider)
336 .await?
337 .client()
338 .clone();
339
340 if self.overwrite_entry_start_id {
341 let start_offset =
342 client
343 .get_offset(OffsetAt::Earliest)
344 .await
345 .context(GetOffsetSnafu {
346 topic: &provider.topic,
347 })?;
348
349 if entry_id as i64 <= start_offset {
350 warn!(
351 "The entry_id: {} is less than start_offset: {}, topic: {}. Overwriting entry_id with start_offset",
352 entry_id, start_offset, &provider.topic
353 );
354
355 entry_id = start_offset as u64;
356 }
357 }
358
359 let end_offset = client
364 .get_offset(OffsetAt::Latest)
365 .await
366 .context(GetOffsetSnafu {
367 topic: &provider.topic,
368 })?;
369 let latest_offset = (end_offset as u64).saturating_sub(1);
370 self.topic_stats
371 .entry(provider.clone())
372 .and_modify(|stat| {
373 stat.latest_offset = stat.latest_offset.max(latest_offset);
374 })
375 .or_insert_with(|| TopicStat {
376 latest_offset,
377 record_size: 0,
378 record_num: 0,
379 });
380
381 let region_indexes = if let (Some(index), Some(collector)) =
382 (index, self.client_manager.global_index_collector())
383 {
384 collector
385 .read_remote_region_index(index.location_id, provider, index.region_id, entry_id)
386 .await?
387 } else {
388 None
389 };
390
391 let Some(iterator) = build_region_wal_index_iterator(
392 entry_id,
393 end_offset as u64,
394 region_indexes,
395 self.max_batch_bytes,
396 MIN_BATCH_WINDOW_SIZE,
397 ) else {
398 let range = entry_id..end_offset as u64;
399 warn!("No new entries in range {:?} of ns {}", range, provider);
400 return Ok(futures_util::stream::empty().boxed());
401 };
402
403 debug!("Reading entries with {:?} of ns {}", iterator, provider);
404
405 let mut stream_consumer = ConsumerBuilder::default()
407 .client(client)
408 .buffer(RecordsBuffer::new(iterator))
410 .max_batch_size(self.max_batch_bytes)
411 .max_wait_ms(self.consumer_wait_timeout.as_millis() as u32)
412 .build()
413 .unwrap();
414
415 let mut entry_records: HashMap<RegionId, Vec<Record>> = HashMap::new();
417 let provider = provider.clone();
418 let stream = async_stream::stream!({
419 let now = Instant::now();
420 while let Some(consume_result) = stream_consumer.next().await {
421 let (record_and_offset, high_watermark) =
425 consume_result.context(ConsumeRecordSnafu {
426 topic: &provider.topic,
427 })?;
428 let (kafka_record, offset) = (record_and_offset.record, record_and_offset.offset);
429
430 debug!(
431 "Read a record at offset {} for topic {}, high watermark: {}",
432 offset, provider.topic, high_watermark
433 );
434
435 if kafka_record.value.is_none() {
437 if check_termination(offset, end_offset) {
438 if let Some(entries) = remaining_entries(&provider, &mut entry_records) {
439 yield Ok(entries);
440 }
441 break;
442 }
443 continue;
444 }
445
446 let record = Record::try_from(kafka_record)?;
447 if let Some(mut entry) = maybe_emit_entry(&provider, record, &mut entry_records)? {
449 entry.set_entry_id(offset as u64);
453 yield Ok(vec![entry]);
454 }
455
456 if check_termination(offset, end_offset) {
457 if let Some(entries) = remaining_entries(&provider, &mut entry_records) {
458 yield Ok(entries);
459 }
460 break;
461 }
462 }
463
464 metrics::METRIC_KAFKA_READ_BYTES_TOTAL.inc_by(stream_consumer.total_fetched_bytes());
465
466 info!(
467 "Fetched {} bytes from topic: {}, start_entry_id: {}, end_offset: {}, elapsed: {:?}",
468 ReadableSize(stream_consumer.total_fetched_bytes()),
469 stream_consumer.topic(),
470 entry_id,
471 end_offset,
472 now.elapsed()
473 );
474 });
475 Ok(Box::pin(stream))
476 }
477
478 async fn create_namespace(&self, _provider: &Provider) -> Result<()> {
480 Ok(())
481 }
482
483 async fn delete_namespace(&self, _provider: &Provider) -> Result<()> {
485 Ok(())
486 }
487
488 async fn list_namespaces(&self) -> Result<Vec<Provider>> {
490 Ok(vec![])
491 }
492
493 async fn obsolete(
497 &self,
498 provider: &Provider,
499 region_id: RegionId,
500 entry_id: EntryId,
501 ) -> Result<()> {
502 if let Some(collector) = self.client_manager.global_index_collector() {
503 let provider = provider
504 .as_kafka_provider()
505 .with_context(|| InvalidProviderSnafu {
506 expected: KafkaProvider::type_name(),
507 actual: provider.type_name(),
508 })?;
509 collector.truncate(provider, region_id, entry_id).await?;
510 }
511 Ok(())
512 }
513
514 fn latest_entry_id(&self, provider: &Provider) -> Result<EntryId> {
516 let provider = provider
517 .as_kafka_provider()
518 .with_context(|| InvalidProviderSnafu {
519 expected: KafkaProvider::type_name(),
520 actual: provider.type_name(),
521 })?;
522
523 let stat = self
524 .topic_stats
525 .get(provider)
526 .as_deref()
527 .copied()
528 .unwrap_or_default();
529
530 Ok(stat.latest_offset)
531 }
532
533 async fn stop(&self) -> Result<()> {
535 Ok(())
536 }
537}
538
539fn check_termination(offset: i64, end_offset: i64) -> bool {
540 if offset >= end_offset {
542 debug!("Stream consumer terminates at offset {}", offset);
543 true
545 } else {
546 false
547 }
548}
549
550#[cfg(test)]
551mod tests {
552
553 use std::assert_matches::assert_matches;
554 use std::collections::HashMap;
555 use std::sync::Arc;
556 use std::time::Duration;
557
558 use common_base::readable_size::ReadableSize;
559 use common_meta::datanode::TopicStatsReporter;
560 use common_telemetry::info;
561 use common_telemetry::tracing::warn;
562 use common_wal::config::kafka::common::KafkaConnectionConfig;
563 use common_wal::config::kafka::DatanodeKafkaConfig;
564 use dashmap::DashMap;
565 use futures::TryStreamExt;
566 use rand::prelude::SliceRandom;
567 use rand::Rng;
568 use rskafka::client::partition::OffsetAt;
569 use store_api::logstore::entry::{Entry, MultiplePartEntry, MultiplePartHeader, NaiveEntry};
570 use store_api::logstore::provider::Provider;
571 use store_api::logstore::LogStore;
572 use store_api::storage::RegionId;
573
574 use super::build_entry;
575 use crate::kafka::log_store::{KafkaLogStore, PeriodicTopicStatsReporter, TopicStat};
576
577 #[test]
578 fn test_build_naive_entry() {
579 let provider = Provider::kafka_provider("my_topic".to_string());
580 let region_id = RegionId::new(1, 1);
581 let entry = build_entry(vec![1; 100], 1, region_id, &provider, 120);
582
583 assert_eq!(
584 entry.into_naive_entry().unwrap(),
585 NaiveEntry {
586 provider,
587 region_id,
588 entry_id: 1,
589 data: vec![1; 100]
590 }
591 )
592 }
593
594 #[test]
595 fn test_build_into_multiple_part_entry() {
596 let provider = Provider::kafka_provider("my_topic".to_string());
597 let region_id = RegionId::new(1, 1);
598 let entry = build_entry(vec![1; 100], 1, region_id, &provider, 50);
599
600 assert_eq!(
601 entry.into_multiple_part_entry().unwrap(),
602 MultiplePartEntry {
603 provider: provider.clone(),
604 region_id,
605 entry_id: 1,
606 headers: vec![MultiplePartHeader::First, MultiplePartHeader::Last],
607 parts: vec![vec![1; 50], vec![1; 50]],
608 }
609 );
610
611 let region_id = RegionId::new(1, 1);
612 let entry = build_entry(vec![1; 100], 1, region_id, &provider, 21);
613
614 assert_eq!(
615 entry.into_multiple_part_entry().unwrap(),
616 MultiplePartEntry {
617 provider,
618 region_id,
619 entry_id: 1,
620 headers: vec![
621 MultiplePartHeader::First,
622 MultiplePartHeader::Middle(1),
623 MultiplePartHeader::Middle(2),
624 MultiplePartHeader::Middle(3),
625 MultiplePartHeader::Last
626 ],
627 parts: vec![
628 vec![1; 21],
629 vec![1; 21],
630 vec![1; 21],
631 vec![1; 21],
632 vec![1; 16]
633 ],
634 }
635 )
636 }
637
638 fn generate_entries(
639 logstore: &KafkaLogStore,
640 provider: &Provider,
641 num_entries: usize,
642 region_id: RegionId,
643 data_len: usize,
644 ) -> Vec<Entry> {
645 (0..num_entries)
646 .map(|_| {
647 let data: Vec<u8> = (0..data_len).map(|_| rand::random::<u8>()).collect();
648 logstore.entry(data, 0, region_id, provider).unwrap()
650 })
651 .collect()
652 }
653
654 async fn prepare_topic(logstore: &KafkaLogStore, topic_name: &str) {
655 let controller_client = logstore.client_manager.controller_client();
656 controller_client
657 .create_topic(topic_name.to_string(), 1, 1, 5000)
658 .await
659 .unwrap();
660 }
661
662 #[tokio::test]
663 async fn test_append_batch_basic() {
664 common_telemetry::init_default_ut_logging();
665 let Ok(broker_endpoints) = std::env::var("GT_KAFKA_ENDPOINTS") else {
666 warn!("The endpoints is empty, skipping the test 'test_append_batch_basic'");
667 return;
668 };
669 let broker_endpoints = broker_endpoints
670 .split(',')
671 .map(|s| s.trim().to_string())
672 .collect::<Vec<_>>();
673 let config = DatanodeKafkaConfig {
674 connection: KafkaConnectionConfig {
675 broker_endpoints,
676 ..Default::default()
677 },
678 max_batch_bytes: ReadableSize::kb(32),
679 ..Default::default()
680 };
681 let logstore = KafkaLogStore::try_new(&config, None).await.unwrap();
682 let topic_name = uuid::Uuid::new_v4().to_string();
683 prepare_topic(&logstore, &topic_name).await;
684 let provider = Provider::kafka_provider(topic_name);
685
686 let region_entries = (0..5)
687 .map(|i| {
688 let region_id = RegionId::new(1, i);
689 (
690 region_id,
691 generate_entries(&logstore, &provider, 20, region_id, 1024),
692 )
693 })
694 .collect::<HashMap<RegionId, Vec<_>>>();
695
696 let mut all_entries = region_entries
697 .values()
698 .flatten()
699 .cloned()
700 .collect::<Vec<_>>();
701 all_entries.shuffle(&mut rand::rng());
702
703 let response = logstore.append_batch(all_entries.clone()).await.unwrap();
704 assert_eq!(response.last_entry_ids.len(), 5);
706 let got_entries = logstore
707 .read(&provider, 0, None)
708 .await
709 .unwrap()
710 .try_collect::<Vec<_>>()
711 .await
712 .unwrap()
713 .into_iter()
714 .flatten()
715 .collect::<Vec<_>>();
716 for (region_id, _) in region_entries {
717 let expected_entries = all_entries
718 .iter()
719 .filter(|entry| entry.region_id() == region_id)
720 .cloned()
721 .collect::<Vec<_>>();
722 let mut actual_entries = got_entries
723 .iter()
724 .filter(|entry| entry.region_id() == region_id)
725 .cloned()
726 .collect::<Vec<_>>();
727 actual_entries
728 .iter_mut()
729 .for_each(|entry| entry.set_entry_id(0));
730 assert_eq!(expected_entries, actual_entries);
731 }
732 let latest_entry_id = logstore.latest_entry_id(&provider).unwrap();
733 let client = logstore
734 .client_manager
735 .get_or_insert(provider.as_kafka_provider().unwrap())
736 .await
737 .unwrap();
738 assert_eq!(latest_entry_id, 99);
739 let latest = client.client().get_offset(OffsetAt::Latest).await.unwrap();
741 assert_eq!(latest, 100);
742 }
743
744 #[tokio::test]
745 async fn test_append_batch_basic_large() {
746 common_telemetry::init_default_ut_logging();
747 let Ok(broker_endpoints) = std::env::var("GT_KAFKA_ENDPOINTS") else {
748 warn!("The endpoints is empty, skipping the test 'test_append_batch_basic_large'");
749 return;
750 };
751 let data_size_kb = rand::rng().random_range(9..31usize);
752 info!("Entry size: {}Ki", data_size_kb);
753 let broker_endpoints = broker_endpoints
754 .split(',')
755 .map(|s| s.trim().to_string())
756 .collect::<Vec<_>>();
757 let config = DatanodeKafkaConfig {
758 connection: KafkaConnectionConfig {
759 broker_endpoints,
760 ..Default::default()
761 },
762 max_batch_bytes: ReadableSize::kb(8),
763 ..Default::default()
764 };
765 let logstore = KafkaLogStore::try_new(&config, None).await.unwrap();
766 let topic_name = uuid::Uuid::new_v4().to_string();
767 prepare_topic(&logstore, &topic_name).await;
768 let provider = Provider::kafka_provider(topic_name);
769 let region_entries = (0..5)
770 .map(|i| {
771 let region_id = RegionId::new(1, i);
772 (
773 region_id,
774 generate_entries(&logstore, &provider, 20, region_id, data_size_kb * 1024),
775 )
776 })
777 .collect::<HashMap<RegionId, Vec<_>>>();
778
779 let mut all_entries = region_entries
780 .values()
781 .flatten()
782 .cloned()
783 .collect::<Vec<_>>();
784 assert_matches!(all_entries[0], Entry::MultiplePart(_));
785 all_entries.shuffle(&mut rand::rng());
786
787 let response = logstore.append_batch(all_entries.clone()).await.unwrap();
788 assert_eq!(response.last_entry_ids.len(), 5);
790 let got_entries = logstore
791 .read(&provider, 0, None)
792 .await
793 .unwrap()
794 .try_collect::<Vec<_>>()
795 .await
796 .unwrap()
797 .into_iter()
798 .flatten()
799 .collect::<Vec<_>>();
800 for (region_id, _) in region_entries {
801 let expected_entries = all_entries
802 .iter()
803 .filter(|entry| entry.region_id() == region_id)
804 .cloned()
805 .collect::<Vec<_>>();
806 let mut actual_entries = got_entries
807 .iter()
808 .filter(|entry| entry.region_id() == region_id)
809 .cloned()
810 .collect::<Vec<_>>();
811 actual_entries
812 .iter_mut()
813 .for_each(|entry| entry.set_entry_id(0));
814 assert_eq!(expected_entries, actual_entries);
815 }
816 let high_wathermark = logstore.latest_entry_id(&provider).unwrap();
817 assert_eq!(high_wathermark, (data_size_kb as u64 / 8 + 1) * 20 * 5 - 1);
818 }
819
820 #[tokio::test]
821 async fn test_topic_stats_reporter() {
822 common_telemetry::init_default_ut_logging();
823 let topic_stats = Arc::new(DashMap::new());
824 let provider = Provider::kafka_provider("my_topic".to_string());
825 topic_stats.insert(
826 provider.as_kafka_provider().unwrap().clone(),
827 TopicStat {
828 latest_offset: 0,
829 record_size: 0,
830 record_num: 0,
831 },
832 );
833 let mut reporter = PeriodicTopicStatsReporter::new(topic_stats, Duration::from_secs(1));
834 let reportable_topics = reporter.reportable_topics();
836 assert_eq!(reportable_topics.len(), 0);
837
838 tokio::time::sleep(Duration::from_secs(1)).await;
840 let reportable_topics = reporter.reportable_topics();
841 assert_eq!(reportable_topics.len(), 1);
842
843 let reportable_topics = reporter.reportable_topics();
845 assert_eq!(reportable_topics.len(), 0);
846 }
847}