log_store/kafka/index/
collector.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeSet, HashMap};
16use std::sync::atomic::{AtomicBool, Ordering};
17use std::sync::Arc;
18use std::time::Duration;
19
20use common_telemetry::{error, info};
21use futures::future::try_join_all;
22use object_store::ErrorKind;
23use serde::{Deserialize, Serialize};
24use snafu::ResultExt;
25use store_api::logstore::provider::KafkaProvider;
26use store_api::logstore::EntryId;
27use store_api::storage::RegionId;
28use tokio::select;
29use tokio::sync::mpsc::Sender;
30use tokio::sync::Mutex as TokioMutex;
31
32use crate::error::{self, Result};
33use crate::kafka::index::encoder::{DatanodeWalIndexes, IndexEncoder};
34use crate::kafka::index::{default_index_file, JsonIndexEncoder};
35use crate::kafka::worker::{DumpIndexRequest, TruncateIndexRequest, WorkerRequest};
36
37/// The [`IndexCollector`] trait defines the operations for managing and collecting index entries.
38pub trait IndexCollector: Send + Sync {
39    /// Appends an [`EntryId`] for a specific region.
40    fn append(&mut self, region_id: RegionId, entry_id: EntryId);
41
42    /// Truncates the index for a specific region up to a given [`EntryId`].
43    ///
44    /// It removes all [`EntryId`]s smaller than `entry_id`.
45    fn truncate(&mut self, region_id: RegionId, entry_id: EntryId);
46
47    /// Sets the latest [`EntryId`].
48    fn set_latest_entry_id(&mut self, entry_id: EntryId);
49
50    /// Dumps the index.
51    fn dump(&mut self, encoder: &dyn IndexEncoder);
52}
53
54/// The [`GlobalIndexCollector`] struct is responsible for managing index entries
55/// across multiple providers.
56#[derive(Debug)]
57pub struct GlobalIndexCollector {
58    providers: Arc<TokioMutex<HashMap<Arc<KafkaProvider>, Sender<WorkerRequest>>>>,
59    operator: object_store::ObjectStore,
60    _handle: CollectionTaskHandle,
61}
62
63#[derive(Debug, Clone)]
64pub struct CollectionTask {
65    providers: Arc<TokioMutex<HashMap<Arc<KafkaProvider>, Sender<WorkerRequest>>>>,
66    dump_index_interval: Duration,
67    operator: object_store::ObjectStore,
68    path: String,
69    running: Arc<AtomicBool>,
70}
71
72impl CollectionTask {
73    async fn dump_index(&self) -> Result<()> {
74        let encoder = Arc::new(JsonIndexEncoder::default());
75        let receivers = {
76            let providers = self.providers.lock().await;
77            let mut receivers = Vec::with_capacity(providers.len());
78            for (provider, sender) in providers.iter() {
79                let (req, rx) = DumpIndexRequest::new(encoder.clone());
80                receivers.push(rx);
81                if sender.send(WorkerRequest::DumpIndex(req)).await.is_err() {
82                    error!(
83                        "BackgroundProducerWorker is stopped, topic: {}",
84                        provider.topic
85                    )
86                }
87            }
88            receivers
89        };
90        try_join_all(receivers)
91            .await
92            .context(error::WaitDumpIndexSnafu)?;
93        let bytes = encoder.finish()?;
94        let mut writer = self
95            .operator
96            .writer(&self.path)
97            .await
98            .context(error::CreateWriterSnafu)?;
99        writer.write(bytes).await.context(error::WriteIndexSnafu)?;
100        writer.close().await.context(error::WriteIndexSnafu)?;
101
102        Ok(())
103    }
104
105    /// The background task performs two main operations:
106    /// - Persists the WAL index to the specified `path` at every `dump_index_interval`.
107    /// - Updates the latest index ID for each WAL provider at every `checkpoint_interval`.
108    fn run(self) -> CollectionTaskHandle {
109        let mut dump_index_interval = tokio::time::interval(self.dump_index_interval);
110        let running = self.running.clone();
111        let moved_self = self.clone();
112        common_runtime::spawn_global(async move {
113            loop {
114                if !running.load(Ordering::Relaxed) {
115                    info!("shutdown the index collection task");
116                    break;
117                }
118                select! {
119                    _ = dump_index_interval.tick() => {
120                        if let Err(err) = moved_self.dump_index().await {
121                            error!(err; "Failed to persist the WAL index");
122                        }
123                    },
124                }
125            }
126        });
127        CollectionTaskHandle {
128            running: self.running.clone(),
129        }
130    }
131}
132
133impl Drop for CollectionTaskHandle {
134    fn drop(&mut self) {
135        self.running.store(false, Ordering::Relaxed);
136    }
137}
138
139#[derive(Debug, Default)]
140struct CollectionTaskHandle {
141    running: Arc<AtomicBool>,
142}
143
144impl GlobalIndexCollector {
145    /// Constructs a [`GlobalIndexCollector`].
146    ///
147    /// This method initializes a `GlobalIndexCollector` instance and starts a background task
148    /// for managing WAL (Write-Ahead Logging) indexes.
149    ///
150    /// The background task persists the WAL index to the specified `path` at every `dump_index_interval`.
151    pub fn new(
152        dump_index_interval: Duration,
153        operator: object_store::ObjectStore,
154        path: String,
155    ) -> Self {
156        let providers: Arc<TokioMutex<HashMap<Arc<KafkaProvider>, Sender<WorkerRequest>>>> =
157            Arc::new(Default::default());
158        let task = CollectionTask {
159            providers: providers.clone(),
160            dump_index_interval,
161            operator: operator.clone(),
162            path,
163            running: Arc::new(AtomicBool::new(true)),
164        };
165        let handle = task.run();
166        Self {
167            providers,
168            operator,
169            _handle: handle,
170        }
171    }
172
173    #[cfg(test)]
174    pub fn new_for_test(operator: object_store::ObjectStore) -> Self {
175        Self {
176            providers: Default::default(),
177            operator,
178            _handle: Default::default(),
179        }
180    }
181}
182
183impl GlobalIndexCollector {
184    /// Retrieve [`EntryId`]s for a specified `region_id` in `datanode_id`
185    /// that are greater than or equal to a given `entry_id`.
186    pub(crate) async fn read_remote_region_index(
187        &self,
188        location_id: u64,
189        provider: &KafkaProvider,
190        region_id: RegionId,
191        entry_id: EntryId,
192    ) -> Result<Option<(BTreeSet<EntryId>, EntryId)>> {
193        let path = default_index_file(location_id);
194
195        let bytes = match self.operator.read(&path).await {
196            Ok(bytes) => bytes.to_vec(),
197            Err(err) => {
198                if err.kind() == ErrorKind::NotFound {
199                    return Ok(None);
200                } else {
201                    return Err(err).context(error::ReadIndexSnafu { path });
202                }
203            }
204        };
205
206        match DatanodeWalIndexes::decode(&bytes)?.provider(provider) {
207            Some(indexes) => {
208                let last_index = indexes.last_index();
209                let indexes = indexes
210                    .region(region_id)
211                    .unwrap_or_default()
212                    .split_off(&entry_id);
213
214                Ok(Some((indexes, last_index)))
215            }
216            None => Ok(None),
217        }
218    }
219
220    /// Creates a new [`ProviderLevelIndexCollector`] for a specified provider.
221    pub(crate) async fn provider_level_index_collector(
222        &self,
223        provider: Arc<KafkaProvider>,
224        sender: Sender<WorkerRequest>,
225    ) -> Box<dyn IndexCollector> {
226        self.providers.lock().await.insert(provider.clone(), sender);
227        Box::new(ProviderLevelIndexCollector {
228            indexes: Default::default(),
229            provider,
230        })
231    }
232
233    /// Truncates the index for a specific region up to a given [`EntryId`].
234    ///
235    /// It removes all [`EntryId`]s smaller than `entry_id`.
236    pub(crate) async fn truncate(
237        &self,
238        provider: &Arc<KafkaProvider>,
239        region_id: RegionId,
240        entry_id: EntryId,
241    ) -> Result<()> {
242        if let Some(sender) = self.providers.lock().await.get(provider).cloned() {
243            if sender
244                .send(WorkerRequest::TruncateIndex(TruncateIndexRequest::new(
245                    region_id, entry_id,
246                )))
247                .await
248                .is_err()
249            {
250                return error::OrderedBatchProducerStoppedSnafu {}.fail();
251            }
252        }
253
254        Ok(())
255    }
256}
257
258/// The [`RegionIndexes`] struct maintains indexes for a collection of regions.
259/// Each region is identified by a `RegionId` and maps to a set of [`EntryId`]s,
260/// representing the entries within that region. It also keeps track of the
261/// latest [`EntryId`] across all regions.
262#[derive(Debug, Clone, Default, Serialize, Deserialize)]
263pub struct RegionIndexes {
264    pub(crate) regions: HashMap<RegionId, BTreeSet<EntryId>>,
265    pub(crate) latest_entry_id: EntryId,
266}
267
268impl RegionIndexes {
269    fn append(&mut self, region_id: RegionId, entry_id: EntryId) {
270        self.regions.entry(region_id).or_default().insert(entry_id);
271        self.latest_entry_id = self.latest_entry_id.max(entry_id);
272    }
273
274    fn truncate(&mut self, region_id: RegionId, entry_id: EntryId) {
275        if let Some(entry_ids) = self.regions.get_mut(&region_id) {
276            *entry_ids = entry_ids.split_off(&entry_id);
277            // The `RegionIndexes` can be empty, keeps to track the latest entry id.
278            self.latest_entry_id = self.latest_entry_id.max(entry_id);
279        }
280    }
281
282    fn set_latest_entry_id(&mut self, entry_id: EntryId) {
283        self.latest_entry_id = entry_id;
284    }
285}
286
287/// The [`ProviderLevelIndexCollector`] struct is responsible for managing index entries
288/// specific to a particular provider.
289#[derive(Debug, Clone)]
290pub struct ProviderLevelIndexCollector {
291    indexes: RegionIndexes,
292    provider: Arc<KafkaProvider>,
293}
294
295impl IndexCollector for ProviderLevelIndexCollector {
296    fn append(&mut self, region_id: RegionId, entry_id: EntryId) {
297        self.indexes.append(region_id, entry_id)
298    }
299
300    fn truncate(&mut self, region_id: RegionId, entry_id: EntryId) {
301        self.indexes.truncate(region_id, entry_id)
302    }
303
304    fn set_latest_entry_id(&mut self, entry_id: EntryId) {
305        self.indexes.set_latest_entry_id(entry_id);
306    }
307
308    fn dump(&mut self, encoder: &dyn IndexEncoder) {
309        encoder.encode(&self.provider, &self.indexes)
310    }
311}
312
313/// The [`NoopCollector`] struct implements the [`IndexCollector`] trait with no-op methods.
314///
315/// This collector effectively ignores all operations, making it suitable for cases
316/// where index collection is not required or should be disabled.
317pub struct NoopCollector;
318
319impl IndexCollector for NoopCollector {
320    fn append(&mut self, _region_id: RegionId, _entry_id: EntryId) {}
321
322    fn truncate(&mut self, _region_id: RegionId, _entry_id: EntryId) {}
323
324    fn set_latest_entry_id(&mut self, _entry_id: EntryId) {}
325
326    fn dump(&mut self, _encoder: &dyn IndexEncoder) {}
327}
328
329#[cfg(test)]
330mod tests {
331    use std::collections::{BTreeSet, HashMap};
332
333    use store_api::logstore::provider::KafkaProvider;
334    use store_api::storage::RegionId;
335
336    use crate::kafka::index::collector::RegionIndexes;
337    use crate::kafka::index::encoder::IndexEncoder;
338    use crate::kafka::index::JsonIndexEncoder;
339    use crate::kafka::{default_index_file, GlobalIndexCollector};
340
341    #[tokio::test]
342    async fn test_read_remote_region_index() {
343        let operator = object_store::ObjectStore::new(object_store::services::Memory::default())
344            .unwrap()
345            .finish();
346
347        let path = default_index_file(0);
348        let encoder = JsonIndexEncoder::default();
349        encoder.encode(
350            &KafkaProvider::new("my_topic_0".to_string()),
351            &RegionIndexes {
352                regions: HashMap::from([(RegionId::new(1, 1), BTreeSet::from([1, 5, 15]))]),
353                latest_entry_id: 20,
354            },
355        );
356        let bytes = encoder.finish().unwrap();
357        let mut writer = operator.writer(&path).await.unwrap();
358        writer.write(bytes).await.unwrap();
359        writer.close().await.unwrap();
360
361        let collector = GlobalIndexCollector::new_for_test(operator.clone());
362        // Index file doesn't exist
363        let result = collector
364            .read_remote_region_index(
365                1,
366                &KafkaProvider::new("my_topic_0".to_string()),
367                RegionId::new(1, 1),
368                1,
369            )
370            .await
371            .unwrap();
372        assert!(result.is_none());
373
374        // RegionId doesn't exist
375        let (indexes, last_index) = collector
376            .read_remote_region_index(
377                0,
378                &KafkaProvider::new("my_topic_0".to_string()),
379                RegionId::new(1, 2),
380                5,
381            )
382            .await
383            .unwrap()
384            .unwrap();
385        assert_eq!(indexes, BTreeSet::new());
386        assert_eq!(last_index, 20);
387
388        // RegionId(1, 1), Start EntryId: 5
389        let (indexes, last_index) = collector
390            .read_remote_region_index(
391                0,
392                &KafkaProvider::new("my_topic_0".to_string()),
393                RegionId::new(1, 1),
394                5,
395            )
396            .await
397            .unwrap()
398            .unwrap();
399        assert_eq!(indexes, BTreeSet::from([5, 15]));
400        assert_eq!(last_index, 20);
401
402        // RegionId(1, 1), Start EntryId: 20
403        let (indexes, last_index) = collector
404            .read_remote_region_index(
405                0,
406                &KafkaProvider::new("my_topic_0".to_string()),
407                RegionId::new(1, 1),
408                20,
409            )
410            .await
411            .unwrap()
412            .unwrap();
413        assert_eq!(indexes, BTreeSet::new());
414        assert_eq!(last_index, 20);
415    }
416}