log_store/kafka/index/
collector.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeSet, HashMap};
16use std::sync::Arc;
17use std::sync::atomic::{AtomicBool, Ordering};
18use std::time::Duration;
19
20use common_telemetry::{error, info};
21use futures::future::try_join_all;
22use object_store::ErrorKind;
23use serde::{Deserialize, Serialize};
24use snafu::ResultExt;
25use store_api::logstore::EntryId;
26use store_api::logstore::provider::KafkaProvider;
27use store_api::storage::RegionId;
28use tokio::select;
29use tokio::sync::Mutex as TokioMutex;
30use tokio::sync::mpsc::Sender;
31
32use crate::error::{self, Result};
33use crate::kafka::index::encoder::{DatanodeWalIndexes, IndexEncoder};
34use crate::kafka::index::{JsonIndexEncoder, default_index_file};
35use crate::kafka::worker::{DumpIndexRequest, TruncateIndexRequest, WorkerRequest};
36
37/// The [`IndexCollector`] trait defines the operations for managing and collecting index entries.
38pub trait IndexCollector: Send + Sync {
39    /// Appends an [`EntryId`] for a specific region.
40    fn append(&mut self, region_id: RegionId, entry_id: EntryId);
41
42    /// Truncates the index for a specific region up to a given [`EntryId`].
43    ///
44    /// It removes all [`EntryId`]s smaller than `entry_id`.
45    fn truncate(&mut self, region_id: RegionId, entry_id: EntryId);
46
47    /// Sets the latest [`EntryId`].
48    fn set_latest_entry_id(&mut self, entry_id: EntryId);
49
50    /// Dumps the index.
51    fn dump(&mut self, encoder: &dyn IndexEncoder);
52}
53
54/// The [`GlobalIndexCollector`] struct is responsible for managing index entries
55/// across multiple providers.
56#[derive(Debug)]
57pub struct GlobalIndexCollector {
58    providers: Arc<TokioMutex<HashMap<Arc<KafkaProvider>, Sender<WorkerRequest>>>>,
59    operator: object_store::ObjectStore,
60    _handle: CollectionTaskHandle,
61}
62
63#[derive(Debug, Clone)]
64pub struct CollectionTask {
65    providers: Arc<TokioMutex<HashMap<Arc<KafkaProvider>, Sender<WorkerRequest>>>>,
66    dump_index_interval: Duration,
67    operator: object_store::ObjectStore,
68    path: String,
69    running: Arc<AtomicBool>,
70}
71
72impl CollectionTask {
73    async fn dump_index(&self) -> Result<()> {
74        let encoder = Arc::new(JsonIndexEncoder::default());
75        let receivers = {
76            let providers = self.providers.lock().await;
77            let mut receivers = Vec::with_capacity(providers.len());
78            for (provider, sender) in providers.iter() {
79                let (req, rx) = DumpIndexRequest::new(encoder.clone());
80                receivers.push(rx);
81                if sender.send(WorkerRequest::DumpIndex(req)).await.is_err() {
82                    error!(
83                        "BackgroundProducerWorker is stopped, topic: {}",
84                        provider.topic
85                    )
86                }
87            }
88            receivers
89        };
90        try_join_all(receivers)
91            .await
92            .context(error::WaitDumpIndexSnafu)?;
93        let bytes = encoder.finish()?;
94        let mut writer = self
95            .operator
96            .writer(&self.path)
97            .await
98            .context(error::CreateWriterSnafu)?;
99        writer.write(bytes).await.context(error::WriteIndexSnafu)?;
100        writer.close().await.context(error::WriteIndexSnafu)?;
101
102        Ok(())
103    }
104
105    /// The background task performs two main operations:
106    /// - Persists the WAL index to the specified `path` at every `dump_index_interval`.
107    /// - Updates the latest index ID for each WAL provider at every `checkpoint_interval`.
108    fn run(self) -> CollectionTaskHandle {
109        let mut dump_index_interval = tokio::time::interval(self.dump_index_interval);
110        let running = self.running.clone();
111        let moved_self = self.clone();
112        common_runtime::spawn_global(async move {
113            loop {
114                if !running.load(Ordering::Relaxed) {
115                    info!("shutdown the index collection task");
116                    break;
117                }
118                select! {
119                    _ = dump_index_interval.tick() => {
120                        if let Err(err) = moved_self.dump_index().await {
121                            error!(err; "Failed to persist the WAL index");
122                        }
123                    },
124                }
125            }
126        });
127        CollectionTaskHandle {
128            running: self.running.clone(),
129        }
130    }
131}
132
133impl Drop for CollectionTaskHandle {
134    fn drop(&mut self) {
135        self.running.store(false, Ordering::Relaxed);
136    }
137}
138
139#[derive(Debug, Default)]
140struct CollectionTaskHandle {
141    running: Arc<AtomicBool>,
142}
143
144impl GlobalIndexCollector {
145    /// Constructs a [`GlobalIndexCollector`].
146    ///
147    /// This method initializes a `GlobalIndexCollector` instance and starts a background task
148    /// for managing WAL (Write-Ahead Logging) indexes.
149    ///
150    /// The background task persists the WAL index to the specified `path` at every `dump_index_interval`.
151    pub fn new(
152        dump_index_interval: Duration,
153        operator: object_store::ObjectStore,
154        path: String,
155    ) -> Self {
156        let providers: Arc<TokioMutex<HashMap<Arc<KafkaProvider>, Sender<WorkerRequest>>>> =
157            Arc::new(Default::default());
158        let task = CollectionTask {
159            providers: providers.clone(),
160            dump_index_interval,
161            operator: operator.clone(),
162            path,
163            running: Arc::new(AtomicBool::new(true)),
164        };
165        let handle = task.run();
166        Self {
167            providers,
168            operator,
169            _handle: handle,
170        }
171    }
172
173    #[cfg(test)]
174    pub fn new_for_test(operator: object_store::ObjectStore) -> Self {
175        Self {
176            providers: Default::default(),
177            operator,
178            _handle: Default::default(),
179        }
180    }
181}
182
183impl GlobalIndexCollector {
184    /// Retrieve [`EntryId`]s for a specified `region_id` in `datanode_id`
185    /// that are greater than or equal to a given `entry_id`.
186    pub(crate) async fn read_remote_region_index(
187        &self,
188        location_id: u64,
189        provider: &KafkaProvider,
190        region_id: RegionId,
191        entry_id: EntryId,
192    ) -> Result<Option<(BTreeSet<EntryId>, EntryId)>> {
193        let path = default_index_file(location_id);
194
195        let bytes = match self.operator.read(&path).await {
196            Ok(bytes) => bytes.to_vec(),
197            Err(err) => {
198                if err.kind() == ErrorKind::NotFound {
199                    return Ok(None);
200                } else {
201                    return Err(err).context(error::ReadIndexSnafu { path });
202                }
203            }
204        };
205
206        match DatanodeWalIndexes::decode(&bytes)?.provider(provider) {
207            Some(indexes) => {
208                let last_index = indexes.last_index();
209                let indexes = indexes
210                    .region(region_id)
211                    .unwrap_or_default()
212                    .split_off(&entry_id);
213
214                Ok(Some((indexes, last_index)))
215            }
216            None => Ok(None),
217        }
218    }
219
220    /// Creates a new [`ProviderLevelIndexCollector`] for a specified provider.
221    pub(crate) async fn provider_level_index_collector(
222        &self,
223        provider: Arc<KafkaProvider>,
224        sender: Sender<WorkerRequest>,
225    ) -> Box<dyn IndexCollector> {
226        self.providers.lock().await.insert(provider.clone(), sender);
227        Box::new(ProviderLevelIndexCollector {
228            indexes: Default::default(),
229            provider,
230        })
231    }
232
233    /// Truncates the index for a specific region up to a given [`EntryId`].
234    ///
235    /// It removes all [`EntryId`]s smaller than `entry_id`.
236    pub(crate) async fn truncate(
237        &self,
238        provider: &Arc<KafkaProvider>,
239        region_id: RegionId,
240        entry_id: EntryId,
241    ) -> Result<()> {
242        if let Some(sender) = self.providers.lock().await.get(provider).cloned()
243            && sender
244                .send(WorkerRequest::TruncateIndex(TruncateIndexRequest::new(
245                    region_id, entry_id,
246                )))
247                .await
248                .is_err()
249        {
250            return error::OrderedBatchProducerStoppedSnafu {}.fail();
251        }
252
253        Ok(())
254    }
255}
256
257/// The [`RegionIndexes`] struct maintains indexes for a collection of regions.
258/// Each region is identified by a `RegionId` and maps to a set of [`EntryId`]s,
259/// representing the entries within that region. It also keeps track of the
260/// latest [`EntryId`] across all regions.
261#[derive(Debug, Clone, Default, Serialize, Deserialize)]
262pub struct RegionIndexes {
263    pub(crate) regions: HashMap<RegionId, BTreeSet<EntryId>>,
264    pub(crate) latest_entry_id: EntryId,
265}
266
267impl RegionIndexes {
268    fn append(&mut self, region_id: RegionId, entry_id: EntryId) {
269        self.regions.entry(region_id).or_default().insert(entry_id);
270        self.latest_entry_id = self.latest_entry_id.max(entry_id);
271    }
272
273    fn truncate(&mut self, region_id: RegionId, entry_id: EntryId) {
274        if let Some(entry_ids) = self.regions.get_mut(&region_id) {
275            *entry_ids = entry_ids.split_off(&entry_id);
276            // The `RegionIndexes` can be empty, keeps to track the latest entry id.
277            self.latest_entry_id = self.latest_entry_id.max(entry_id);
278        }
279    }
280
281    fn set_latest_entry_id(&mut self, entry_id: EntryId) {
282        self.latest_entry_id = entry_id;
283    }
284}
285
286/// The [`ProviderLevelIndexCollector`] struct is responsible for managing index entries
287/// specific to a particular provider.
288#[derive(Debug, Clone)]
289pub struct ProviderLevelIndexCollector {
290    indexes: RegionIndexes,
291    provider: Arc<KafkaProvider>,
292}
293
294impl IndexCollector for ProviderLevelIndexCollector {
295    fn append(&mut self, region_id: RegionId, entry_id: EntryId) {
296        self.indexes.append(region_id, entry_id)
297    }
298
299    fn truncate(&mut self, region_id: RegionId, entry_id: EntryId) {
300        self.indexes.truncate(region_id, entry_id)
301    }
302
303    fn set_latest_entry_id(&mut self, entry_id: EntryId) {
304        self.indexes.set_latest_entry_id(entry_id);
305    }
306
307    fn dump(&mut self, encoder: &dyn IndexEncoder) {
308        encoder.encode(&self.provider, &self.indexes)
309    }
310}
311
312/// The [`NoopCollector`] struct implements the [`IndexCollector`] trait with no-op methods.
313///
314/// This collector effectively ignores all operations, making it suitable for cases
315/// where index collection is not required or should be disabled.
316pub struct NoopCollector;
317
318impl IndexCollector for NoopCollector {
319    fn append(&mut self, _region_id: RegionId, _entry_id: EntryId) {}
320
321    fn truncate(&mut self, _region_id: RegionId, _entry_id: EntryId) {}
322
323    fn set_latest_entry_id(&mut self, _entry_id: EntryId) {}
324
325    fn dump(&mut self, _encoder: &dyn IndexEncoder) {}
326}
327
328#[cfg(test)]
329mod tests {
330    use std::collections::{BTreeSet, HashMap};
331
332    use store_api::logstore::provider::KafkaProvider;
333    use store_api::storage::RegionId;
334
335    use crate::kafka::index::JsonIndexEncoder;
336    use crate::kafka::index::collector::RegionIndexes;
337    use crate::kafka::index::encoder::IndexEncoder;
338    use crate::kafka::{GlobalIndexCollector, default_index_file};
339
340    #[tokio::test]
341    async fn test_read_remote_region_index() {
342        let operator = object_store::ObjectStore::new(object_store::services::Memory::default())
343            .unwrap()
344            .finish();
345
346        let path = default_index_file(0);
347        let encoder = JsonIndexEncoder::default();
348        encoder.encode(
349            &KafkaProvider::new("my_topic_0".to_string()),
350            &RegionIndexes {
351                regions: HashMap::from([(RegionId::new(1, 1), BTreeSet::from([1, 5, 15]))]),
352                latest_entry_id: 20,
353            },
354        );
355        let bytes = encoder.finish().unwrap();
356        let mut writer = operator.writer(&path).await.unwrap();
357        writer.write(bytes).await.unwrap();
358        writer.close().await.unwrap();
359
360        let collector = GlobalIndexCollector::new_for_test(operator.clone());
361        // Index file doesn't exist
362        let result = collector
363            .read_remote_region_index(
364                1,
365                &KafkaProvider::new("my_topic_0".to_string()),
366                RegionId::new(1, 1),
367                1,
368            )
369            .await
370            .unwrap();
371        assert!(result.is_none());
372
373        // RegionId doesn't exist
374        let (indexes, last_index) = collector
375            .read_remote_region_index(
376                0,
377                &KafkaProvider::new("my_topic_0".to_string()),
378                RegionId::new(1, 2),
379                5,
380            )
381            .await
382            .unwrap()
383            .unwrap();
384        assert_eq!(indexes, BTreeSet::new());
385        assert_eq!(last_index, 20);
386
387        // RegionId(1, 1), Start EntryId: 5
388        let (indexes, last_index) = collector
389            .read_remote_region_index(
390                0,
391                &KafkaProvider::new("my_topic_0".to_string()),
392                RegionId::new(1, 1),
393                5,
394            )
395            .await
396            .unwrap()
397            .unwrap();
398        assert_eq!(indexes, BTreeSet::from([5, 15]));
399        assert_eq!(last_index, 20);
400
401        // RegionId(1, 1), Start EntryId: 20
402        let (indexes, last_index) = collector
403            .read_remote_region_index(
404                0,
405                &KafkaProvider::new("my_topic_0".to_string()),
406                RegionId::new(1, 1),
407                20,
408            )
409            .await
410            .unwrap()
411            .unwrap();
412        assert_eq!(indexes, BTreeSet::new());
413        assert_eq!(last_index, 20);
414    }
415}