1use std::collections::{BTreeSet, HashMap};
16use std::sync::Arc;
17use std::sync::atomic::{AtomicBool, Ordering};
18use std::time::Duration;
19
20use common_telemetry::{error, info};
21use futures::future::try_join_all;
22use object_store::ErrorKind;
23use serde::{Deserialize, Serialize};
24use snafu::ResultExt;
25use store_api::logstore::EntryId;
26use store_api::logstore::provider::KafkaProvider;
27use store_api::storage::RegionId;
28use tokio::select;
29use tokio::sync::Mutex as TokioMutex;
30use tokio::sync::mpsc::Sender;
31
32use crate::error::{self, Result};
33use crate::kafka::index::encoder::{DatanodeWalIndexes, IndexEncoder};
34use crate::kafka::index::{JsonIndexEncoder, default_index_file};
35use crate::kafka::worker::{DumpIndexRequest, TruncateIndexRequest, WorkerRequest};
36
37pub trait IndexCollector: Send + Sync {
39 fn append(&mut self, region_id: RegionId, entry_id: EntryId);
41
42 fn truncate(&mut self, region_id: RegionId, entry_id: EntryId);
46
47 fn set_latest_entry_id(&mut self, entry_id: EntryId);
49
50 fn dump(&mut self, encoder: &dyn IndexEncoder);
52}
53
54#[derive(Debug)]
57pub struct GlobalIndexCollector {
58 providers: Arc<TokioMutex<HashMap<Arc<KafkaProvider>, Sender<WorkerRequest>>>>,
59 operator: object_store::ObjectStore,
60 _handle: CollectionTaskHandle,
61}
62
63#[derive(Debug, Clone)]
64pub struct CollectionTask {
65 providers: Arc<TokioMutex<HashMap<Arc<KafkaProvider>, Sender<WorkerRequest>>>>,
66 dump_index_interval: Duration,
67 operator: object_store::ObjectStore,
68 path: String,
69 running: Arc<AtomicBool>,
70}
71
72impl CollectionTask {
73 async fn dump_index(&self) -> Result<()> {
74 let encoder = Arc::new(JsonIndexEncoder::default());
75 let receivers = {
76 let providers = self.providers.lock().await;
77 let mut receivers = Vec::with_capacity(providers.len());
78 for (provider, sender) in providers.iter() {
79 let (req, rx) = DumpIndexRequest::new(encoder.clone());
80 receivers.push(rx);
81 if sender.send(WorkerRequest::DumpIndex(req)).await.is_err() {
82 error!(
83 "BackgroundProducerWorker is stopped, topic: {}",
84 provider.topic
85 )
86 }
87 }
88 receivers
89 };
90 try_join_all(receivers)
91 .await
92 .context(error::WaitDumpIndexSnafu)?;
93 let bytes = encoder.finish()?;
94 let mut writer = self
95 .operator
96 .writer(&self.path)
97 .await
98 .context(error::CreateWriterSnafu)?;
99 writer.write(bytes).await.context(error::WriteIndexSnafu)?;
100 writer.close().await.context(error::WriteIndexSnafu)?;
101
102 Ok(())
103 }
104
105 fn run(self) -> CollectionTaskHandle {
109 let mut dump_index_interval = tokio::time::interval(self.dump_index_interval);
110 let running = self.running.clone();
111 let moved_self = self.clone();
112 common_runtime::spawn_global(async move {
113 loop {
114 if !running.load(Ordering::Relaxed) {
115 info!("shutdown the index collection task");
116 break;
117 }
118 select! {
119 _ = dump_index_interval.tick() => {
120 if let Err(err) = moved_self.dump_index().await {
121 error!(err; "Failed to persist the WAL index");
122 }
123 },
124 }
125 }
126 });
127 CollectionTaskHandle {
128 running: self.running.clone(),
129 }
130 }
131}
132
133impl Drop for CollectionTaskHandle {
134 fn drop(&mut self) {
135 self.running.store(false, Ordering::Relaxed);
136 }
137}
138
139#[derive(Debug, Default)]
140struct CollectionTaskHandle {
141 running: Arc<AtomicBool>,
142}
143
144impl GlobalIndexCollector {
145 pub fn new(
152 dump_index_interval: Duration,
153 operator: object_store::ObjectStore,
154 path: String,
155 ) -> Self {
156 let providers: Arc<TokioMutex<HashMap<Arc<KafkaProvider>, Sender<WorkerRequest>>>> =
157 Arc::new(Default::default());
158 let task = CollectionTask {
159 providers: providers.clone(),
160 dump_index_interval,
161 operator: operator.clone(),
162 path,
163 running: Arc::new(AtomicBool::new(true)),
164 };
165 let handle = task.run();
166 Self {
167 providers,
168 operator,
169 _handle: handle,
170 }
171 }
172
173 #[cfg(test)]
174 pub fn new_for_test(operator: object_store::ObjectStore) -> Self {
175 Self {
176 providers: Default::default(),
177 operator,
178 _handle: Default::default(),
179 }
180 }
181}
182
183impl GlobalIndexCollector {
184 pub(crate) async fn read_remote_region_index(
187 &self,
188 location_id: u64,
189 provider: &KafkaProvider,
190 region_id: RegionId,
191 entry_id: EntryId,
192 ) -> Result<Option<(BTreeSet<EntryId>, EntryId)>> {
193 let path = default_index_file(location_id);
194
195 let bytes = match self.operator.read(&path).await {
196 Ok(bytes) => bytes.to_vec(),
197 Err(err) => {
198 if err.kind() == ErrorKind::NotFound {
199 return Ok(None);
200 } else {
201 return Err(err).context(error::ReadIndexSnafu { path });
202 }
203 }
204 };
205
206 match DatanodeWalIndexes::decode(&bytes)?.provider(provider) {
207 Some(indexes) => {
208 let last_index = indexes.last_index();
209 let indexes = indexes
210 .region(region_id)
211 .unwrap_or_default()
212 .split_off(&entry_id);
213
214 Ok(Some((indexes, last_index)))
215 }
216 None => Ok(None),
217 }
218 }
219
220 pub(crate) async fn provider_level_index_collector(
222 &self,
223 provider: Arc<KafkaProvider>,
224 sender: Sender<WorkerRequest>,
225 ) -> Box<dyn IndexCollector> {
226 self.providers.lock().await.insert(provider.clone(), sender);
227 Box::new(ProviderLevelIndexCollector {
228 indexes: Default::default(),
229 provider,
230 })
231 }
232
233 pub(crate) async fn truncate(
237 &self,
238 provider: &Arc<KafkaProvider>,
239 region_id: RegionId,
240 entry_id: EntryId,
241 ) -> Result<()> {
242 if let Some(sender) = self.providers.lock().await.get(provider).cloned()
243 && sender
244 .send(WorkerRequest::TruncateIndex(TruncateIndexRequest::new(
245 region_id, entry_id,
246 )))
247 .await
248 .is_err()
249 {
250 return error::OrderedBatchProducerStoppedSnafu {}.fail();
251 }
252
253 Ok(())
254 }
255}
256
257#[derive(Debug, Clone, Default, Serialize, Deserialize)]
262pub struct RegionIndexes {
263 pub(crate) regions: HashMap<RegionId, BTreeSet<EntryId>>,
264 pub(crate) latest_entry_id: EntryId,
265}
266
267impl RegionIndexes {
268 fn append(&mut self, region_id: RegionId, entry_id: EntryId) {
269 self.regions.entry(region_id).or_default().insert(entry_id);
270 self.latest_entry_id = self.latest_entry_id.max(entry_id);
271 }
272
273 fn truncate(&mut self, region_id: RegionId, entry_id: EntryId) {
274 if let Some(entry_ids) = self.regions.get_mut(®ion_id) {
275 *entry_ids = entry_ids.split_off(&entry_id);
276 self.latest_entry_id = self.latest_entry_id.max(entry_id);
278 }
279 }
280
281 fn set_latest_entry_id(&mut self, entry_id: EntryId) {
282 self.latest_entry_id = entry_id;
283 }
284}
285
286#[derive(Debug, Clone)]
289pub struct ProviderLevelIndexCollector {
290 indexes: RegionIndexes,
291 provider: Arc<KafkaProvider>,
292}
293
294impl IndexCollector for ProviderLevelIndexCollector {
295 fn append(&mut self, region_id: RegionId, entry_id: EntryId) {
296 self.indexes.append(region_id, entry_id)
297 }
298
299 fn truncate(&mut self, region_id: RegionId, entry_id: EntryId) {
300 self.indexes.truncate(region_id, entry_id)
301 }
302
303 fn set_latest_entry_id(&mut self, entry_id: EntryId) {
304 self.indexes.set_latest_entry_id(entry_id);
305 }
306
307 fn dump(&mut self, encoder: &dyn IndexEncoder) {
308 encoder.encode(&self.provider, &self.indexes)
309 }
310}
311
312pub struct NoopCollector;
317
318impl IndexCollector for NoopCollector {
319 fn append(&mut self, _region_id: RegionId, _entry_id: EntryId) {}
320
321 fn truncate(&mut self, _region_id: RegionId, _entry_id: EntryId) {}
322
323 fn set_latest_entry_id(&mut self, _entry_id: EntryId) {}
324
325 fn dump(&mut self, _encoder: &dyn IndexEncoder) {}
326}
327
328#[cfg(test)]
329mod tests {
330 use std::collections::{BTreeSet, HashMap};
331
332 use store_api::logstore::provider::KafkaProvider;
333 use store_api::storage::RegionId;
334
335 use crate::kafka::index::JsonIndexEncoder;
336 use crate::kafka::index::collector::RegionIndexes;
337 use crate::kafka::index::encoder::IndexEncoder;
338 use crate::kafka::{GlobalIndexCollector, default_index_file};
339
340 #[tokio::test]
341 async fn test_read_remote_region_index() {
342 let operator = object_store::ObjectStore::new(object_store::services::Memory::default())
343 .unwrap()
344 .finish();
345
346 let path = default_index_file(0);
347 let encoder = JsonIndexEncoder::default();
348 encoder.encode(
349 &KafkaProvider::new("my_topic_0".to_string()),
350 &RegionIndexes {
351 regions: HashMap::from([(RegionId::new(1, 1), BTreeSet::from([1, 5, 15]))]),
352 latest_entry_id: 20,
353 },
354 );
355 let bytes = encoder.finish().unwrap();
356 let mut writer = operator.writer(&path).await.unwrap();
357 writer.write(bytes).await.unwrap();
358 writer.close().await.unwrap();
359
360 let collector = GlobalIndexCollector::new_for_test(operator.clone());
361 let result = collector
363 .read_remote_region_index(
364 1,
365 &KafkaProvider::new("my_topic_0".to_string()),
366 RegionId::new(1, 1),
367 1,
368 )
369 .await
370 .unwrap();
371 assert!(result.is_none());
372
373 let (indexes, last_index) = collector
375 .read_remote_region_index(
376 0,
377 &KafkaProvider::new("my_topic_0".to_string()),
378 RegionId::new(1, 2),
379 5,
380 )
381 .await
382 .unwrap()
383 .unwrap();
384 assert_eq!(indexes, BTreeSet::new());
385 assert_eq!(last_index, 20);
386
387 let (indexes, last_index) = collector
389 .read_remote_region_index(
390 0,
391 &KafkaProvider::new("my_topic_0".to_string()),
392 RegionId::new(1, 1),
393 5,
394 )
395 .await
396 .unwrap()
397 .unwrap();
398 assert_eq!(indexes, BTreeSet::from([5, 15]));
399 assert_eq!(last_index, 20);
400
401 let (indexes, last_index) = collector
403 .read_remote_region_index(
404 0,
405 &KafkaProvider::new("my_topic_0".to_string()),
406 RegionId::new(1, 1),
407 20,
408 )
409 .await
410 .unwrap()
411 .unwrap();
412 assert_eq!(indexes, BTreeSet::new());
413 assert_eq!(last_index, 20);
414 }
415}