log_store/kafka/util/
record.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use rskafka::record::Record as KafkaRecord;
19use serde::{Deserialize, Serialize};
20use snafu::{OptionExt, ResultExt, ensure};
21use store_api::logstore::entry::{Entry, MultiplePartEntry, MultiplePartHeader, NaiveEntry};
22use store_api::logstore::provider::{KafkaProvider, Provider};
23use store_api::storage::RegionId;
24
25use crate::error::{
26    DecodeJsonSnafu, EncodeJsonSnafu, IllegalSequenceSnafu, MetaLengthExceededLimitSnafu,
27    MissingKeySnafu, MissingValueSnafu, Result,
28};
29use crate::kafka::{EntryId, NamespaceImpl};
30
31/// The current version of Record.
32pub(crate) const VERSION: u32 = 0;
33
34/// The estimated size in bytes of a serialized RecordMeta.
35/// A record is guaranteed to have sizeof(meta) + sizeof(data) <= max_batch_byte - ESTIMATED_META_SIZE.
36pub(crate) const ESTIMATED_META_SIZE: usize = 256;
37
38/// The type of a record.
39///
40/// - If the entry is able to fit into a Kafka record, it's converted into a Full record.
41///
42/// - If the entry is too large to fit into a Kafka record, it's converted into a collection of records.
43///
44/// Those records must contain exactly one First record and one Last record, and potentially several
45///   Middle records. There may be no Middle record.
46#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
47pub enum RecordType {
48    /// The record is self-contained, i.e. an entry's data is fully stored into this record.
49    Full,
50    /// The record contains the first part of an entry's data.
51    First,
52    /// The record contains one of the middle parts of an entry's data.
53    /// The sequence of the record is identified by the inner field.
54    Middle(usize),
55    /// The record contains the last part of an entry's data.
56    Last,
57}
58
59/// The metadata of a record.
60#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
61pub struct RecordMeta {
62    /// The version of the record. Used for backward compatibility.
63    version: u32,
64    /// The type of the record.
65    pub tp: RecordType,
66    /// The id of the entry the record associated with.
67    pub entry_id: EntryId,
68    /// The namespace of the entry the record associated with.
69    pub ns: NamespaceImpl,
70}
71
72/// The minimal storage unit in the Kafka log store.
73///
74/// An entry will be first converted into several Records before producing.
75/// If an entry is able to fit into a KafkaRecord, it converts to a single Record.
76/// If otherwise an entry cannot fit into a KafkaRecord, it will be split into a collection of Records.
77///
78/// A KafkaRecord is the minimal storage unit used by Kafka client and Kafka server.
79/// The Kafka client produces KafkaRecords and consumes KafkaRecords, and Kafka server stores
80/// a collection of KafkaRecords.
81#[derive(Debug, Clone, PartialEq)]
82pub(crate) struct Record {
83    /// The metadata of the record.
84    pub(crate) meta: RecordMeta,
85    /// The payload of the record.
86    data: Vec<u8>,
87}
88
89impl TryFrom<Record> for KafkaRecord {
90    type Error = crate::error::Error;
91
92    fn try_from(record: Record) -> Result<Self> {
93        let key = serde_json::to_vec(&record.meta).context(EncodeJsonSnafu)?;
94        ensure!(
95            key.len() < ESTIMATED_META_SIZE,
96            MetaLengthExceededLimitSnafu {
97                limit: ESTIMATED_META_SIZE,
98                actual: key.len()
99            }
100        );
101        Ok(KafkaRecord {
102            key: Some(key),
103            value: Some(record.data),
104            timestamp: chrono::Utc::now(),
105            headers: Default::default(),
106        })
107    }
108}
109
110// TODO(niebayes): improve the performance of decoding kafka record.
111impl TryFrom<KafkaRecord> for Record {
112    type Error = crate::error::Error;
113
114    fn try_from(kafka_record: KafkaRecord) -> Result<Self> {
115        let key = kafka_record.key.context(MissingKeySnafu)?;
116        let meta = serde_json::from_slice(&key).context(DecodeJsonSnafu)?;
117        let data = kafka_record.value.context(MissingValueSnafu)?;
118        Ok(Self { meta, data })
119    }
120}
121
122pub(crate) fn convert_to_kafka_records(entry: Entry) -> Result<Vec<KafkaRecord>> {
123    match entry {
124        Entry::Naive(entry) => Ok(vec![KafkaRecord::try_from(Record {
125            meta: RecordMeta {
126                version: VERSION,
127                tp: RecordType::Full,
128                // TODO(weny): refactor the record meta.
129                entry_id: 0,
130                ns: NamespaceImpl {
131                    region_id: entry.region_id.as_u64(),
132                    // TODO(weny): refactor the record meta.
133                    topic: String::new(),
134                },
135            },
136            data: entry.data,
137        })?]),
138        Entry::MultiplePart(entry) => {
139            let mut entries = Vec::with_capacity(entry.parts.len());
140
141            for (idx, part) in entry.parts.into_iter().enumerate() {
142                let tp = match entry.headers[idx] {
143                    MultiplePartHeader::First => RecordType::First,
144                    MultiplePartHeader::Middle(i) => RecordType::Middle(i),
145                    MultiplePartHeader::Last => RecordType::Last,
146                };
147                entries.push(KafkaRecord::try_from(Record {
148                    meta: RecordMeta {
149                        version: VERSION,
150                        tp,
151                        // TODO(weny): refactor the record meta.
152                        entry_id: 0,
153                        ns: NamespaceImpl {
154                            region_id: entry.region_id.as_u64(),
155                            topic: String::new(),
156                        },
157                    },
158                    data: part,
159                })?)
160            }
161            Ok(entries)
162        }
163    }
164}
165
166fn convert_to_naive_entry(provider: Arc<KafkaProvider>, record: Record) -> Entry {
167    let region_id = RegionId::from_u64(record.meta.ns.region_id);
168
169    Entry::Naive(NaiveEntry {
170        provider: Provider::Kafka(provider),
171        region_id,
172        entry_id: record.meta.entry_id,
173        data: record.data,
174    })
175}
176
177fn convert_to_multiple_entry(
178    provider: Arc<KafkaProvider>,
179    region_id: RegionId,
180    records: Vec<Record>,
181) -> Entry {
182    let mut headers = Vec::with_capacity(records.len());
183    let mut parts = Vec::with_capacity(records.len());
184    let entry_id = records.last().map(|r| r.meta.entry_id).unwrap_or_default();
185
186    for record in records {
187        let header = match record.meta.tp {
188            RecordType::Full => unreachable!(),
189            RecordType::First => MultiplePartHeader::First,
190            RecordType::Middle(i) => MultiplePartHeader::Middle(i),
191            RecordType::Last => MultiplePartHeader::Last,
192        };
193        headers.push(header);
194        parts.push(record.data);
195    }
196
197    Entry::MultiplePart(MultiplePartEntry {
198        provider: Provider::Kafka(provider),
199        region_id,
200        entry_id,
201        headers,
202        parts,
203    })
204}
205
206/// Constructs entries from `buffered_records`
207pub fn remaining_entries(
208    provider: &Arc<KafkaProvider>,
209    buffered_records: &mut HashMap<RegionId, Vec<Record>>,
210) -> Option<Vec<Entry>> {
211    if buffered_records.is_empty() {
212        None
213    } else {
214        let mut entries = Vec::with_capacity(buffered_records.len());
215        for (region_id, records) in buffered_records.drain() {
216            entries.push(convert_to_multiple_entry(
217                provider.clone(),
218                region_id,
219                records,
220            ));
221        }
222        Some(entries)
223    }
224}
225
226/// For type of [Entry::Naive] Entry:
227/// - Emits a [RecordType::Full] type record immediately.
228///
229/// For type of [Entry::MultiplePart] Entry:
230/// - Emits a complete or incomplete [Entry] while the next same [RegionId] record arrives.
231///
232/// **Incomplete Entry:**
233/// If the records arrive in the following order, it emits **the incomplete [Entry]** when the next record arrives.
234/// - **[RecordType::First], [RecordType::Middle]**, [RecordType::First]
235/// - **[RecordType::Middle]**, [RecordType::First]
236/// - **[RecordType::Last]**
237pub(crate) fn maybe_emit_entry(
238    provider: &Arc<KafkaProvider>,
239    record: Record,
240    buffered_records: &mut HashMap<RegionId, Vec<Record>>,
241) -> Result<Option<Entry>> {
242    let mut entry = None;
243    match record.meta.tp {
244        RecordType::Full => entry = Some(convert_to_naive_entry(provider.clone(), record)),
245        RecordType::First => {
246            let region_id = record.meta.ns.region_id.into();
247            if let Some(records) = buffered_records.insert(region_id, vec![record]) {
248                // Incomplete entry
249                entry = Some(convert_to_multiple_entry(
250                    provider.clone(),
251                    region_id,
252                    records,
253                ))
254            }
255        }
256        RecordType::Middle(seq) => {
257            let region_id = record.meta.ns.region_id.into();
258            let records = buffered_records.entry(region_id).or_default();
259
260            // Only validate complete entries.
261            if !records.is_empty() {
262                // Safety: the records are guaranteed not empty if the key exists.
263                let last_record = records.last().unwrap();
264                let legal = match last_record.meta.tp {
265                    // Legal if this record follows a First record.
266                    RecordType::First => seq == 1,
267                    // Legal if this record follows a Middle record just prior to this record.
268                    RecordType::Middle(last_seq) => last_seq + 1 == seq,
269                    // Illegal sequence.
270                    _ => false,
271                };
272                ensure!(
273                    legal,
274                    IllegalSequenceSnafu {
275                        error: format!(
276                            "Illegal sequence of a middle record, last record: {:?}, incoming record: {:?}",
277                            last_record.meta.tp, record.meta.tp
278                        )
279                    }
280                );
281            }
282
283            records.push(record);
284        }
285        RecordType::Last => {
286            let region_id = record.meta.ns.region_id.into();
287            if let Some(mut records) = buffered_records.remove(&region_id) {
288                records.push(record);
289                entry = Some(convert_to_multiple_entry(
290                    provider.clone(),
291                    region_id,
292                    records,
293                ))
294            } else {
295                // Incomplete entry
296                entry = Some(convert_to_multiple_entry(
297                    provider.clone(),
298                    region_id,
299                    vec![record],
300                ))
301            }
302        }
303    }
304    Ok(entry)
305}
306
307#[cfg(test)]
308mod tests {
309    use std::assert_matches::assert_matches;
310    use std::sync::Arc;
311
312    use super::*;
313    use crate::error;
314
315    fn new_test_record(tp: RecordType, entry_id: EntryId, region_id: u64, data: Vec<u8>) -> Record {
316        Record {
317            meta: RecordMeta {
318                version: VERSION,
319                tp,
320                ns: NamespaceImpl {
321                    region_id,
322                    topic: "greptimedb_wal_topic".to_string(),
323                },
324                entry_id,
325            },
326            data,
327        }
328    }
329
330    #[test]
331    fn test_maybe_emit_entry_emit_naive_entry() {
332        let provider = Arc::new(KafkaProvider::new("my_topic".to_string()));
333        let region_id = RegionId::new(1, 1);
334        let mut buffer = HashMap::new();
335        let record = new_test_record(RecordType::Full, 1, region_id.as_u64(), vec![1; 100]);
336        let entry = maybe_emit_entry(&provider, record, &mut buffer)
337            .unwrap()
338            .unwrap();
339        assert_eq!(
340            entry,
341            Entry::Naive(NaiveEntry {
342                provider: Provider::Kafka(provider),
343                region_id,
344                entry_id: 1,
345                data: vec![1; 100]
346            })
347        );
348    }
349
350    #[test]
351    fn test_maybe_emit_entry_emit_incomplete_entry() {
352        let provider = Arc::new(KafkaProvider::new("my_topic".to_string()));
353        let region_id = RegionId::new(1, 1);
354        // `First` overwrite `First`
355        let mut buffer = HashMap::new();
356        let record = new_test_record(RecordType::First, 1, region_id.as_u64(), vec![1; 100]);
357        assert!(
358            maybe_emit_entry(&provider, record, &mut buffer)
359                .unwrap()
360                .is_none()
361        );
362        let record = new_test_record(RecordType::First, 2, region_id.as_u64(), vec![2; 100]);
363        let incomplete_entry = maybe_emit_entry(&provider, record, &mut buffer)
364            .unwrap()
365            .unwrap();
366
367        assert_eq!(
368            incomplete_entry,
369            Entry::MultiplePart(MultiplePartEntry {
370                provider: Provider::Kafka(provider.clone()),
371                region_id,
372                entry_id: 1,
373                headers: vec![MultiplePartHeader::First],
374                parts: vec![vec![1; 100]],
375            })
376        );
377
378        // `Last` overwrite `None`
379        let mut buffer = HashMap::new();
380        let record = new_test_record(RecordType::Last, 1, region_id.as_u64(), vec![1; 100]);
381        let incomplete_entry = maybe_emit_entry(&provider, record, &mut buffer)
382            .unwrap()
383            .unwrap();
384
385        assert_eq!(
386            incomplete_entry,
387            Entry::MultiplePart(MultiplePartEntry {
388                provider: Provider::Kafka(provider.clone()),
389                region_id,
390                entry_id: 1,
391                headers: vec![MultiplePartHeader::Last],
392                parts: vec![vec![1; 100]],
393            })
394        );
395
396        // `First` overwrite `Middle(0)`
397        let mut buffer = HashMap::new();
398        let record = new_test_record(RecordType::Middle(0), 1, region_id.as_u64(), vec![1; 100]);
399        assert!(
400            maybe_emit_entry(&provider, record, &mut buffer)
401                .unwrap()
402                .is_none()
403        );
404        let record = new_test_record(RecordType::First, 2, region_id.as_u64(), vec![2; 100]);
405        let incomplete_entry = maybe_emit_entry(&provider, record, &mut buffer)
406            .unwrap()
407            .unwrap();
408
409        assert_eq!(
410            incomplete_entry,
411            Entry::MultiplePart(MultiplePartEntry {
412                provider: Provider::Kafka(provider),
413                region_id,
414                entry_id: 1,
415                headers: vec![MultiplePartHeader::Middle(0)],
416                parts: vec![vec![1; 100]],
417            })
418        );
419    }
420
421    #[test]
422    fn test_maybe_emit_entry_illegal_seq() {
423        let provider = Arc::new(KafkaProvider::new("my_topic".to_string()));
424        let region_id = RegionId::new(1, 1);
425        let mut buffer = HashMap::new();
426        let record = new_test_record(RecordType::First, 1, region_id.as_u64(), vec![1; 100]);
427        assert!(
428            maybe_emit_entry(&provider, record, &mut buffer)
429                .unwrap()
430                .is_none()
431        );
432        let record = new_test_record(RecordType::Middle(2), 1, region_id.as_u64(), vec![2; 100]);
433        let err = maybe_emit_entry(&provider, record, &mut buffer).unwrap_err();
434        assert_matches!(err, error::Error::IllegalSequence { .. });
435
436        let mut buffer = HashMap::new();
437        let record = new_test_record(RecordType::First, 1, region_id.as_u64(), vec![1; 100]);
438        assert!(
439            maybe_emit_entry(&provider, record, &mut buffer)
440                .unwrap()
441                .is_none()
442        );
443        let record = new_test_record(RecordType::Middle(1), 1, region_id.as_u64(), vec![2; 100]);
444        assert!(
445            maybe_emit_entry(&provider, record, &mut buffer)
446                .unwrap()
447                .is_none()
448        );
449        let record = new_test_record(RecordType::Middle(3), 1, region_id.as_u64(), vec![2; 100]);
450        let err = maybe_emit_entry(&provider, record, &mut buffer).unwrap_err();
451        assert_matches!(err, error::Error::IllegalSequence { .. });
452    }
453
454    #[test]
455    fn test_meta_size() {
456        let meta = RecordMeta {
457            version: VERSION,
458            tp: RecordType::Middle(usize::MAX),
459            entry_id: u64::MAX,
460            ns: NamespaceImpl {
461                region_id: RegionId::new(u32::MAX, u32::MAX).as_u64(),
462                topic: format!("greptime_kafka_cluster/1024/2048/{}", uuid::Uuid::new_v4()),
463            },
464        };
465        let serialized = serde_json::to_vec(&meta).unwrap();
466        // The len of serialized data is 202.
467        assert!(serialized.len() < ESTIMATED_META_SIZE);
468    }
469}