1use std::collections::{BTreeSet, HashMap};
16use std::sync::atomic::{AtomicBool, Ordering};
17use std::sync::Arc;
18use std::time::Duration;
19
20use common_telemetry::{error, info};
21use futures::future::try_join_all;
22use object_store::ErrorKind;
23use serde::{Deserialize, Serialize};
24use snafu::ResultExt;
25use store_api::logstore::provider::KafkaProvider;
26use store_api::logstore::EntryId;
27use store_api::storage::RegionId;
28use tokio::select;
29use tokio::sync::mpsc::Sender;
30use tokio::sync::Mutex as TokioMutex;
31
32use crate::error::{self, Result};
33use crate::kafka::index::encoder::{DatanodeWalIndexes, IndexEncoder};
34use crate::kafka::index::{default_index_file, JsonIndexEncoder};
35use crate::kafka::worker::{DumpIndexRequest, TruncateIndexRequest, WorkerRequest};
36
37pub trait IndexCollector: Send + Sync {
39 fn append(&mut self, region_id: RegionId, entry_id: EntryId);
41
42 fn truncate(&mut self, region_id: RegionId, entry_id: EntryId);
46
47 fn set_latest_entry_id(&mut self, entry_id: EntryId);
49
50 fn dump(&mut self, encoder: &dyn IndexEncoder);
52}
53
54#[derive(Debug)]
57pub struct GlobalIndexCollector {
58 providers: Arc<TokioMutex<HashMap<Arc<KafkaProvider>, Sender<WorkerRequest>>>>,
59 operator: object_store::ObjectStore,
60 _handle: CollectionTaskHandle,
61}
62
63#[derive(Debug, Clone)]
64pub struct CollectionTask {
65 providers: Arc<TokioMutex<HashMap<Arc<KafkaProvider>, Sender<WorkerRequest>>>>,
66 dump_index_interval: Duration,
67 operator: object_store::ObjectStore,
68 path: String,
69 running: Arc<AtomicBool>,
70}
71
72impl CollectionTask {
73 async fn dump_index(&self) -> Result<()> {
74 let encoder = Arc::new(JsonIndexEncoder::default());
75 let receivers = {
76 let providers = self.providers.lock().await;
77 let mut receivers = Vec::with_capacity(providers.len());
78 for (provider, sender) in providers.iter() {
79 let (req, rx) = DumpIndexRequest::new(encoder.clone());
80 receivers.push(rx);
81 if sender.send(WorkerRequest::DumpIndex(req)).await.is_err() {
82 error!(
83 "BackgroundProducerWorker is stopped, topic: {}",
84 provider.topic
85 )
86 }
87 }
88 receivers
89 };
90 try_join_all(receivers)
91 .await
92 .context(error::WaitDumpIndexSnafu)?;
93 let bytes = encoder.finish()?;
94 let mut writer = self
95 .operator
96 .writer(&self.path)
97 .await
98 .context(error::CreateWriterSnafu)?;
99 writer.write(bytes).await.context(error::WriteIndexSnafu)?;
100 writer.close().await.context(error::WriteIndexSnafu)?;
101
102 Ok(())
103 }
104
105 fn run(self) -> CollectionTaskHandle {
109 let mut dump_index_interval = tokio::time::interval(self.dump_index_interval);
110 let running = self.running.clone();
111 let moved_self = self.clone();
112 common_runtime::spawn_global(async move {
113 loop {
114 if !running.load(Ordering::Relaxed) {
115 info!("shutdown the index collection task");
116 break;
117 }
118 select! {
119 _ = dump_index_interval.tick() => {
120 if let Err(err) = moved_self.dump_index().await {
121 error!(err; "Failed to persist the WAL index");
122 }
123 },
124 }
125 }
126 });
127 CollectionTaskHandle {
128 running: self.running.clone(),
129 }
130 }
131}
132
133impl Drop for CollectionTaskHandle {
134 fn drop(&mut self) {
135 self.running.store(false, Ordering::Relaxed);
136 }
137}
138
139#[derive(Debug, Default)]
140struct CollectionTaskHandle {
141 running: Arc<AtomicBool>,
142}
143
144impl GlobalIndexCollector {
145 pub fn new(
152 dump_index_interval: Duration,
153 operator: object_store::ObjectStore,
154 path: String,
155 ) -> Self {
156 let providers: Arc<TokioMutex<HashMap<Arc<KafkaProvider>, Sender<WorkerRequest>>>> =
157 Arc::new(Default::default());
158 let task = CollectionTask {
159 providers: providers.clone(),
160 dump_index_interval,
161 operator: operator.clone(),
162 path,
163 running: Arc::new(AtomicBool::new(true)),
164 };
165 let handle = task.run();
166 Self {
167 providers,
168 operator,
169 _handle: handle,
170 }
171 }
172
173 #[cfg(test)]
174 pub fn new_for_test(operator: object_store::ObjectStore) -> Self {
175 Self {
176 providers: Default::default(),
177 operator,
178 _handle: Default::default(),
179 }
180 }
181}
182
183impl GlobalIndexCollector {
184 pub(crate) async fn read_remote_region_index(
187 &self,
188 location_id: u64,
189 provider: &KafkaProvider,
190 region_id: RegionId,
191 entry_id: EntryId,
192 ) -> Result<Option<(BTreeSet<EntryId>, EntryId)>> {
193 let path = default_index_file(location_id);
194
195 let bytes = match self.operator.read(&path).await {
196 Ok(bytes) => bytes.to_vec(),
197 Err(err) => {
198 if err.kind() == ErrorKind::NotFound {
199 return Ok(None);
200 } else {
201 return Err(err).context(error::ReadIndexSnafu { path });
202 }
203 }
204 };
205
206 match DatanodeWalIndexes::decode(&bytes)?.provider(provider) {
207 Some(indexes) => {
208 let last_index = indexes.last_index();
209 let indexes = indexes
210 .region(region_id)
211 .unwrap_or_default()
212 .split_off(&entry_id);
213
214 Ok(Some((indexes, last_index)))
215 }
216 None => Ok(None),
217 }
218 }
219
220 pub(crate) async fn provider_level_index_collector(
222 &self,
223 provider: Arc<KafkaProvider>,
224 sender: Sender<WorkerRequest>,
225 ) -> Box<dyn IndexCollector> {
226 self.providers.lock().await.insert(provider.clone(), sender);
227 Box::new(ProviderLevelIndexCollector {
228 indexes: Default::default(),
229 provider,
230 })
231 }
232
233 pub(crate) async fn truncate(
237 &self,
238 provider: &Arc<KafkaProvider>,
239 region_id: RegionId,
240 entry_id: EntryId,
241 ) -> Result<()> {
242 if let Some(sender) = self.providers.lock().await.get(provider).cloned() {
243 if sender
244 .send(WorkerRequest::TruncateIndex(TruncateIndexRequest::new(
245 region_id, entry_id,
246 )))
247 .await
248 .is_err()
249 {
250 return error::OrderedBatchProducerStoppedSnafu {}.fail();
251 }
252 }
253
254 Ok(())
255 }
256}
257
258#[derive(Debug, Clone, Default, Serialize, Deserialize)]
263pub struct RegionIndexes {
264 pub(crate) regions: HashMap<RegionId, BTreeSet<EntryId>>,
265 pub(crate) latest_entry_id: EntryId,
266}
267
268impl RegionIndexes {
269 fn append(&mut self, region_id: RegionId, entry_id: EntryId) {
270 self.regions.entry(region_id).or_default().insert(entry_id);
271 self.latest_entry_id = self.latest_entry_id.max(entry_id);
272 }
273
274 fn truncate(&mut self, region_id: RegionId, entry_id: EntryId) {
275 if let Some(entry_ids) = self.regions.get_mut(®ion_id) {
276 *entry_ids = entry_ids.split_off(&entry_id);
277 self.latest_entry_id = self.latest_entry_id.max(entry_id);
279 }
280 }
281
282 fn set_latest_entry_id(&mut self, entry_id: EntryId) {
283 self.latest_entry_id = entry_id;
284 }
285}
286
287#[derive(Debug, Clone)]
290pub struct ProviderLevelIndexCollector {
291 indexes: RegionIndexes,
292 provider: Arc<KafkaProvider>,
293}
294
295impl IndexCollector for ProviderLevelIndexCollector {
296 fn append(&mut self, region_id: RegionId, entry_id: EntryId) {
297 self.indexes.append(region_id, entry_id)
298 }
299
300 fn truncate(&mut self, region_id: RegionId, entry_id: EntryId) {
301 self.indexes.truncate(region_id, entry_id)
302 }
303
304 fn set_latest_entry_id(&mut self, entry_id: EntryId) {
305 self.indexes.set_latest_entry_id(entry_id);
306 }
307
308 fn dump(&mut self, encoder: &dyn IndexEncoder) {
309 encoder.encode(&self.provider, &self.indexes)
310 }
311}
312
313pub struct NoopCollector;
318
319impl IndexCollector for NoopCollector {
320 fn append(&mut self, _region_id: RegionId, _entry_id: EntryId) {}
321
322 fn truncate(&mut self, _region_id: RegionId, _entry_id: EntryId) {}
323
324 fn set_latest_entry_id(&mut self, _entry_id: EntryId) {}
325
326 fn dump(&mut self, _encoder: &dyn IndexEncoder) {}
327}
328
329#[cfg(test)]
330mod tests {
331 use std::collections::{BTreeSet, HashMap};
332
333 use store_api::logstore::provider::KafkaProvider;
334 use store_api::storage::RegionId;
335
336 use crate::kafka::index::collector::RegionIndexes;
337 use crate::kafka::index::encoder::IndexEncoder;
338 use crate::kafka::index::JsonIndexEncoder;
339 use crate::kafka::{default_index_file, GlobalIndexCollector};
340
341 #[tokio::test]
342 async fn test_read_remote_region_index() {
343 let operator = object_store::ObjectStore::new(object_store::services::Memory::default())
344 .unwrap()
345 .finish();
346
347 let path = default_index_file(0);
348 let encoder = JsonIndexEncoder::default();
349 encoder.encode(
350 &KafkaProvider::new("my_topic_0".to_string()),
351 &RegionIndexes {
352 regions: HashMap::from([(RegionId::new(1, 1), BTreeSet::from([1, 5, 15]))]),
353 latest_entry_id: 20,
354 },
355 );
356 let bytes = encoder.finish().unwrap();
357 let mut writer = operator.writer(&path).await.unwrap();
358 writer.write(bytes).await.unwrap();
359 writer.close().await.unwrap();
360
361 let collector = GlobalIndexCollector::new_for_test(operator.clone());
362 let result = collector
364 .read_remote_region_index(
365 1,
366 &KafkaProvider::new("my_topic_0".to_string()),
367 RegionId::new(1, 1),
368 1,
369 )
370 .await
371 .unwrap();
372 assert!(result.is_none());
373
374 let (indexes, last_index) = collector
376 .read_remote_region_index(
377 0,
378 &KafkaProvider::new("my_topic_0".to_string()),
379 RegionId::new(1, 2),
380 5,
381 )
382 .await
383 .unwrap()
384 .unwrap();
385 assert_eq!(indexes, BTreeSet::new());
386 assert_eq!(last_index, 20);
387
388 let (indexes, last_index) = collector
390 .read_remote_region_index(
391 0,
392 &KafkaProvider::new("my_topic_0".to_string()),
393 RegionId::new(1, 1),
394 5,
395 )
396 .await
397 .unwrap()
398 .unwrap();
399 assert_eq!(indexes, BTreeSet::from([5, 15]));
400 assert_eq!(last_index, 20);
401
402 let (indexes, last_index) = collector
404 .read_remote_region_index(
405 0,
406 &KafkaProvider::new("my_topic_0".to_string()),
407 RegionId::new(1, 1),
408 20,
409 )
410 .await
411 .unwrap()
412 .unwrap();
413 assert_eq!(indexes, BTreeSet::new());
414 assert_eq!(last_index, 20);
415 }
416}