log_store/kafka/util/
record.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use rskafka::record::Record as KafkaRecord;
19use serde::{Deserialize, Serialize};
20use snafu::{ensure, OptionExt, ResultExt};
21use store_api::logstore::entry::{Entry, MultiplePartEntry, MultiplePartHeader, NaiveEntry};
22use store_api::logstore::provider::{KafkaProvider, Provider};
23use store_api::storage::RegionId;
24
25use crate::error::{
26    DecodeJsonSnafu, EncodeJsonSnafu, IllegalSequenceSnafu, MetaLengthExceededLimitSnafu,
27    MissingKeySnafu, MissingValueSnafu, Result,
28};
29use crate::kafka::{EntryId, NamespaceImpl};
30
31/// The current version of Record.
32pub(crate) const VERSION: u32 = 0;
33
34/// The estimated size in bytes of a serialized RecordMeta.
35/// A record is guaranteed to have sizeof(meta) + sizeof(data) <= max_batch_byte - ESTIMATED_META_SIZE.
36pub(crate) const ESTIMATED_META_SIZE: usize = 256;
37
38/// The type of a record.
39///
40/// - If the entry is able to fit into a Kafka record, it's converted into a Full record.
41///
42/// - If the entry is too large to fit into a Kafka record, it's converted into a collection of records.
43///
44/// Those records must contain exactly one First record and one Last record, and potentially several
45///   Middle records. There may be no Middle record.
46#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
47pub enum RecordType {
48    /// The record is self-contained, i.e. an entry's data is fully stored into this record.
49    Full,
50    /// The record contains the first part of an entry's data.
51    First,
52    /// The record contains one of the middle parts of an entry's data.
53    /// The sequence of the record is identified by the inner field.
54    Middle(usize),
55    /// The record contains the last part of an entry's data.
56    Last,
57}
58
59/// The metadata of a record.
60#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
61pub struct RecordMeta {
62    /// The version of the record. Used for backward compatibility.
63    version: u32,
64    /// The type of the record.
65    pub tp: RecordType,
66    /// The id of the entry the record associated with.
67    pub entry_id: EntryId,
68    /// The namespace of the entry the record associated with.
69    pub ns: NamespaceImpl,
70}
71
72/// The minimal storage unit in the Kafka log store.
73///
74/// An entry will be first converted into several Records before producing.
75/// If an entry is able to fit into a KafkaRecord, it converts to a single Record.
76/// If otherwise an entry cannot fit into a KafkaRecord, it will be split into a collection of Records.
77///
78/// A KafkaRecord is the minimal storage unit used by Kafka client and Kafka server.
79/// The Kafka client produces KafkaRecords and consumes KafkaRecords, and Kafka server stores
80/// a collection of KafkaRecords.
81#[derive(Debug, Clone, PartialEq)]
82pub(crate) struct Record {
83    /// The metadata of the record.
84    pub(crate) meta: RecordMeta,
85    /// The payload of the record.
86    data: Vec<u8>,
87}
88
89impl TryFrom<Record> for KafkaRecord {
90    type Error = crate::error::Error;
91
92    fn try_from(record: Record) -> Result<Self> {
93        let key = serde_json::to_vec(&record.meta).context(EncodeJsonSnafu)?;
94        ensure!(
95            key.len() < ESTIMATED_META_SIZE,
96            MetaLengthExceededLimitSnafu {
97                limit: ESTIMATED_META_SIZE,
98                actual: key.len()
99            }
100        );
101        Ok(KafkaRecord {
102            key: Some(key),
103            value: Some(record.data),
104            timestamp: chrono::Utc::now(),
105            headers: Default::default(),
106        })
107    }
108}
109
110// TODO(niebayes): improve the performance of decoding kafka record.
111impl TryFrom<KafkaRecord> for Record {
112    type Error = crate::error::Error;
113
114    fn try_from(kafka_record: KafkaRecord) -> Result<Self> {
115        let key = kafka_record.key.context(MissingKeySnafu)?;
116        let meta = serde_json::from_slice(&key).context(DecodeJsonSnafu)?;
117        let data = kafka_record.value.context(MissingValueSnafu)?;
118        Ok(Self { meta, data })
119    }
120}
121
122pub(crate) fn convert_to_kafka_records(entry: Entry) -> Result<Vec<KafkaRecord>> {
123    match entry {
124        Entry::Naive(entry) => Ok(vec![KafkaRecord::try_from(Record {
125            meta: RecordMeta {
126                version: VERSION,
127                tp: RecordType::Full,
128                // TODO(weny): refactor the record meta.
129                entry_id: 0,
130                ns: NamespaceImpl {
131                    region_id: entry.region_id.as_u64(),
132                    // TODO(weny): refactor the record meta.
133                    topic: String::new(),
134                },
135            },
136            data: entry.data,
137        })?]),
138        Entry::MultiplePart(entry) => {
139            let mut entries = Vec::with_capacity(entry.parts.len());
140
141            for (idx, part) in entry.parts.into_iter().enumerate() {
142                let tp = match entry.headers[idx] {
143                    MultiplePartHeader::First => RecordType::First,
144                    MultiplePartHeader::Middle(i) => RecordType::Middle(i),
145                    MultiplePartHeader::Last => RecordType::Last,
146                };
147                entries.push(KafkaRecord::try_from(Record {
148                    meta: RecordMeta {
149                        version: VERSION,
150                        tp,
151                        // TODO(weny): refactor the record meta.
152                        entry_id: 0,
153                        ns: NamespaceImpl {
154                            region_id: entry.region_id.as_u64(),
155                            topic: String::new(),
156                        },
157                    },
158                    data: part,
159                })?)
160            }
161            Ok(entries)
162        }
163    }
164}
165
166fn convert_to_naive_entry(provider: Arc<KafkaProvider>, record: Record) -> Entry {
167    let region_id = RegionId::from_u64(record.meta.ns.region_id);
168
169    Entry::Naive(NaiveEntry {
170        provider: Provider::Kafka(provider),
171        region_id,
172        entry_id: record.meta.entry_id,
173        data: record.data,
174    })
175}
176
177fn convert_to_multiple_entry(
178    provider: Arc<KafkaProvider>,
179    region_id: RegionId,
180    records: Vec<Record>,
181) -> Entry {
182    let mut headers = Vec::with_capacity(records.len());
183    let mut parts = Vec::with_capacity(records.len());
184    let entry_id = records.last().map(|r| r.meta.entry_id).unwrap_or_default();
185
186    for record in records {
187        let header = match record.meta.tp {
188            RecordType::Full => unreachable!(),
189            RecordType::First => MultiplePartHeader::First,
190            RecordType::Middle(i) => MultiplePartHeader::Middle(i),
191            RecordType::Last => MultiplePartHeader::Last,
192        };
193        headers.push(header);
194        parts.push(record.data);
195    }
196
197    Entry::MultiplePart(MultiplePartEntry {
198        provider: Provider::Kafka(provider),
199        region_id,
200        entry_id,
201        headers,
202        parts,
203    })
204}
205
206/// Constructs entries from `buffered_records`
207pub fn remaining_entries(
208    provider: &Arc<KafkaProvider>,
209    buffered_records: &mut HashMap<RegionId, Vec<Record>>,
210) -> Option<Vec<Entry>> {
211    if buffered_records.is_empty() {
212        None
213    } else {
214        let mut entries = Vec::with_capacity(buffered_records.len());
215        for (region_id, records) in buffered_records.drain() {
216            entries.push(convert_to_multiple_entry(
217                provider.clone(),
218                region_id,
219                records,
220            ));
221        }
222        Some(entries)
223    }
224}
225
226/// For type of [Entry::Naive] Entry:
227/// - Emits a [RecordType::Full] type record immediately.
228///
229/// For type of [Entry::MultiplePart] Entry:
230/// - Emits a complete or incomplete [Entry] while the next same [RegionId] record arrives.
231///
232/// **Incomplete Entry:**
233/// If the records arrive in the following order, it emits **the incomplete [Entry]** when the next record arrives.
234/// - **[RecordType::First], [RecordType::Middle]**, [RecordType::First]
235/// - **[RecordType::Middle]**, [RecordType::First]
236/// - **[RecordType::Last]**
237pub(crate) fn maybe_emit_entry(
238    provider: &Arc<KafkaProvider>,
239    record: Record,
240    buffered_records: &mut HashMap<RegionId, Vec<Record>>,
241) -> Result<Option<Entry>> {
242    let mut entry = None;
243    match record.meta.tp {
244        RecordType::Full => entry = Some(convert_to_naive_entry(provider.clone(), record)),
245        RecordType::First => {
246            let region_id = record.meta.ns.region_id.into();
247            if let Some(records) = buffered_records.insert(region_id, vec![record]) {
248                // Incomplete entry
249                entry = Some(convert_to_multiple_entry(
250                    provider.clone(),
251                    region_id,
252                    records,
253                ))
254            }
255        }
256        RecordType::Middle(seq) => {
257            let region_id = record.meta.ns.region_id.into();
258            let records = buffered_records.entry(region_id).or_default();
259
260            // Only validate complete entries.
261            if !records.is_empty() {
262                // Safety: the records are guaranteed not empty if the key exists.
263                let last_record = records.last().unwrap();
264                let legal = match last_record.meta.tp {
265                    // Legal if this record follows a First record.
266                    RecordType::First => seq == 1,
267                    // Legal if this record follows a Middle record just prior to this record.
268                    RecordType::Middle(last_seq) => last_seq + 1 == seq,
269                    // Illegal sequence.
270                    _ => false,
271                };
272                ensure!(
273                    legal,
274                    IllegalSequenceSnafu {
275                        error: format!(
276                            "Illegal sequence of a middle record, last record: {:?}, incoming record: {:?}",
277                            last_record.meta.tp,
278                            record.meta.tp
279                        )
280                    }
281                );
282            }
283
284            records.push(record);
285        }
286        RecordType::Last => {
287            let region_id = record.meta.ns.region_id.into();
288            if let Some(mut records) = buffered_records.remove(&region_id) {
289                records.push(record);
290                entry = Some(convert_to_multiple_entry(
291                    provider.clone(),
292                    region_id,
293                    records,
294                ))
295            } else {
296                // Incomplete entry
297                entry = Some(convert_to_multiple_entry(
298                    provider.clone(),
299                    region_id,
300                    vec![record],
301                ))
302            }
303        }
304    }
305    Ok(entry)
306}
307
308#[cfg(test)]
309mod tests {
310    use std::assert_matches::assert_matches;
311    use std::sync::Arc;
312
313    use super::*;
314    use crate::error;
315
316    fn new_test_record(tp: RecordType, entry_id: EntryId, region_id: u64, data: Vec<u8>) -> Record {
317        Record {
318            meta: RecordMeta {
319                version: VERSION,
320                tp,
321                ns: NamespaceImpl {
322                    region_id,
323                    topic: "greptimedb_wal_topic".to_string(),
324                },
325                entry_id,
326            },
327            data,
328        }
329    }
330
331    #[test]
332    fn test_maybe_emit_entry_emit_naive_entry() {
333        let provider = Arc::new(KafkaProvider::new("my_topic".to_string()));
334        let region_id = RegionId::new(1, 1);
335        let mut buffer = HashMap::new();
336        let record = new_test_record(RecordType::Full, 1, region_id.as_u64(), vec![1; 100]);
337        let entry = maybe_emit_entry(&provider, record, &mut buffer)
338            .unwrap()
339            .unwrap();
340        assert_eq!(
341            entry,
342            Entry::Naive(NaiveEntry {
343                provider: Provider::Kafka(provider),
344                region_id,
345                entry_id: 1,
346                data: vec![1; 100]
347            })
348        );
349    }
350
351    #[test]
352    fn test_maybe_emit_entry_emit_incomplete_entry() {
353        let provider = Arc::new(KafkaProvider::new("my_topic".to_string()));
354        let region_id = RegionId::new(1, 1);
355        // `First` overwrite `First`
356        let mut buffer = HashMap::new();
357        let record = new_test_record(RecordType::First, 1, region_id.as_u64(), vec![1; 100]);
358        assert!(maybe_emit_entry(&provider, record, &mut buffer)
359            .unwrap()
360            .is_none());
361        let record = new_test_record(RecordType::First, 2, region_id.as_u64(), vec![2; 100]);
362        let incomplete_entry = maybe_emit_entry(&provider, record, &mut buffer)
363            .unwrap()
364            .unwrap();
365
366        assert_eq!(
367            incomplete_entry,
368            Entry::MultiplePart(MultiplePartEntry {
369                provider: Provider::Kafka(provider.clone()),
370                region_id,
371                entry_id: 1,
372                headers: vec![MultiplePartHeader::First],
373                parts: vec![vec![1; 100]],
374            })
375        );
376
377        // `Last` overwrite `None`
378        let mut buffer = HashMap::new();
379        let record = new_test_record(RecordType::Last, 1, region_id.as_u64(), vec![1; 100]);
380        let incomplete_entry = maybe_emit_entry(&provider, record, &mut buffer)
381            .unwrap()
382            .unwrap();
383
384        assert_eq!(
385            incomplete_entry,
386            Entry::MultiplePart(MultiplePartEntry {
387                provider: Provider::Kafka(provider.clone()),
388                region_id,
389                entry_id: 1,
390                headers: vec![MultiplePartHeader::Last],
391                parts: vec![vec![1; 100]],
392            })
393        );
394
395        // `First` overwrite `Middle(0)`
396        let mut buffer = HashMap::new();
397        let record = new_test_record(RecordType::Middle(0), 1, region_id.as_u64(), vec![1; 100]);
398        assert!(maybe_emit_entry(&provider, record, &mut buffer)
399            .unwrap()
400            .is_none());
401        let record = new_test_record(RecordType::First, 2, region_id.as_u64(), vec![2; 100]);
402        let incomplete_entry = maybe_emit_entry(&provider, record, &mut buffer)
403            .unwrap()
404            .unwrap();
405
406        assert_eq!(
407            incomplete_entry,
408            Entry::MultiplePart(MultiplePartEntry {
409                provider: Provider::Kafka(provider),
410                region_id,
411                entry_id: 1,
412                headers: vec![MultiplePartHeader::Middle(0)],
413                parts: vec![vec![1; 100]],
414            })
415        );
416    }
417
418    #[test]
419    fn test_maybe_emit_entry_illegal_seq() {
420        let provider = Arc::new(KafkaProvider::new("my_topic".to_string()));
421        let region_id = RegionId::new(1, 1);
422        let mut buffer = HashMap::new();
423        let record = new_test_record(RecordType::First, 1, region_id.as_u64(), vec![1; 100]);
424        assert!(maybe_emit_entry(&provider, record, &mut buffer)
425            .unwrap()
426            .is_none());
427        let record = new_test_record(RecordType::Middle(2), 1, region_id.as_u64(), vec![2; 100]);
428        let err = maybe_emit_entry(&provider, record, &mut buffer).unwrap_err();
429        assert_matches!(err, error::Error::IllegalSequence { .. });
430
431        let mut buffer = HashMap::new();
432        let record = new_test_record(RecordType::First, 1, region_id.as_u64(), vec![1; 100]);
433        assert!(maybe_emit_entry(&provider, record, &mut buffer)
434            .unwrap()
435            .is_none());
436        let record = new_test_record(RecordType::Middle(1), 1, region_id.as_u64(), vec![2; 100]);
437        assert!(maybe_emit_entry(&provider, record, &mut buffer)
438            .unwrap()
439            .is_none());
440        let record = new_test_record(RecordType::Middle(3), 1, region_id.as_u64(), vec![2; 100]);
441        let err = maybe_emit_entry(&provider, record, &mut buffer).unwrap_err();
442        assert_matches!(err, error::Error::IllegalSequence { .. });
443    }
444
445    #[test]
446    fn test_meta_size() {
447        let meta = RecordMeta {
448            version: VERSION,
449            tp: RecordType::Middle(usize::MAX),
450            entry_id: u64::MAX,
451            ns: NamespaceImpl {
452                region_id: RegionId::new(u32::MAX, u32::MAX).as_u64(),
453                topic: format!("greptime_kafka_cluster/1024/2048/{}", uuid::Uuid::new_v4()),
454            },
455        };
456        let serialized = serde_json::to_vec(&meta).unwrap();
457        // The len of serialized data is 202.
458        assert!(serialized.len() < ESTIMATED_META_SIZE);
459    }
460}