log_store/kafka/util/
record.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use rskafka::record::Record as KafkaRecord;
19use serde::{Deserialize, Serialize};
20use snafu::{ensure, OptionExt, ResultExt};
21use store_api::logstore::entry::{Entry, MultiplePartEntry, MultiplePartHeader, NaiveEntry};
22use store_api::logstore::provider::{KafkaProvider, Provider};
23use store_api::storage::RegionId;
24
25use crate::error::{
26    DecodeJsonSnafu, EncodeJsonSnafu, IllegalSequenceSnafu, MetaLengthExceededLimitSnafu,
27    MissingKeySnafu, MissingValueSnafu, Result,
28};
29use crate::kafka::{EntryId, NamespaceImpl};
30
31/// The current version of Record.
32pub(crate) const VERSION: u32 = 0;
33
34/// The estimated size in bytes of a serialized RecordMeta.
35/// A record is guaranteed to have sizeof(meta) + sizeof(data) <= max_batch_byte - ESTIMATED_META_SIZE.
36pub(crate) const ESTIMATED_META_SIZE: usize = 256;
37
38/// The type of a record.
39///
40/// - If the entry is able to fit into a Kafka record, it's converted into a Full record.
41///
42/// - If the entry is too large to fit into a Kafka record, it's converted into a collection of records.
43///
44/// Those records must contain exactly one First record and one Last record, and potentially several
45///   Middle records. There may be no Middle record.
46#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
47pub enum RecordType {
48    /// The record is self-contained, i.e. an entry's data is fully stored into this record.
49    Full,
50    /// The record contains the first part of an entry's data.
51    First,
52    /// The record contains one of the middle parts of an entry's data.
53    /// The sequence of the record is identified by the inner field.
54    Middle(usize),
55    /// The record contains the last part of an entry's data.
56    Last,
57}
58
59/// The metadata of a record.
60#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
61pub struct RecordMeta {
62    /// The version of the record. Used for backward compatibility.
63    version: u32,
64    /// The type of the record.
65    pub tp: RecordType,
66    /// The id of the entry the record associated with.
67    pub entry_id: EntryId,
68    /// The namespace of the entry the record associated with.
69    pub ns: NamespaceImpl,
70}
71
72/// The minimal storage unit in the Kafka log store.
73///
74/// An entry will be first converted into several Records before producing.
75/// If an entry is able to fit into a KafkaRecord, it converts to a single Record.
76/// If otherwise an entry cannot fit into a KafkaRecord, it will be split into a collection of Records.
77///
78/// A KafkaRecord is the minimal storage unit used by Kafka client and Kafka server.
79/// The Kafka client produces KafkaRecords and consumes KafkaRecords, and Kafka server stores
80/// a collection of KafkaRecords.
81#[derive(Debug, Clone, PartialEq)]
82pub(crate) struct Record {
83    /// The metadata of the record.
84    pub(crate) meta: RecordMeta,
85    /// The payload of the record.
86    data: Vec<u8>,
87}
88
89impl TryFrom<Record> for KafkaRecord {
90    type Error = crate::error::Error;
91
92    fn try_from(record: Record) -> Result<Self> {
93        let key = serde_json::to_vec(&record.meta).context(EncodeJsonSnafu)?;
94        ensure!(
95            key.len() < ESTIMATED_META_SIZE,
96            MetaLengthExceededLimitSnafu {
97                limit: ESTIMATED_META_SIZE,
98                actual: key.len()
99            }
100        );
101        Ok(KafkaRecord {
102            key: Some(key),
103            value: Some(record.data),
104            timestamp: chrono::Utc::now(),
105            headers: Default::default(),
106        })
107    }
108}
109
110// TODO(niebayes): improve the performance of decoding kafka record.
111impl TryFrom<KafkaRecord> for Record {
112    type Error = crate::error::Error;
113
114    fn try_from(kafka_record: KafkaRecord) -> Result<Self> {
115        let key = kafka_record.key.context(MissingKeySnafu)?;
116        let meta = serde_json::from_slice(&key).context(DecodeJsonSnafu)?;
117        let data = kafka_record.value.context(MissingValueSnafu)?;
118        Ok(Self { meta, data })
119    }
120}
121
122pub(crate) fn convert_to_kafka_records(entry: Entry) -> Result<Vec<KafkaRecord>> {
123    match entry {
124        Entry::Naive(entry) => Ok(vec![KafkaRecord::try_from(Record {
125            meta: RecordMeta {
126                version: VERSION,
127                tp: RecordType::Full,
128                // TODO(weny): refactor the record meta.
129                entry_id: 0,
130                ns: NamespaceImpl {
131                    region_id: entry.region_id.as_u64(),
132                    // TODO(weny): refactor the record meta.
133                    topic: String::new(),
134                },
135            },
136            data: entry.data,
137        })?]),
138        Entry::MultiplePart(entry) => {
139            let mut entries = Vec::with_capacity(entry.parts.len());
140
141            for (idx, part) in entry.parts.into_iter().enumerate() {
142                let tp = match entry.headers[idx] {
143                    MultiplePartHeader::First => RecordType::First,
144                    MultiplePartHeader::Middle(i) => RecordType::Middle(i),
145                    MultiplePartHeader::Last => RecordType::Last,
146                };
147                entries.push(KafkaRecord::try_from(Record {
148                    meta: RecordMeta {
149                        version: VERSION,
150                        tp,
151                        // TODO(weny): refactor the record meta.
152                        entry_id: 0,
153                        ns: NamespaceImpl {
154                            region_id: entry.region_id.as_u64(),
155                            topic: String::new(),
156                        },
157                    },
158                    data: part,
159                })?)
160            }
161            Ok(entries)
162        }
163    }
164}
165
166fn convert_to_naive_entry(provider: Arc<KafkaProvider>, record: Record) -> Entry {
167    let region_id = RegionId::from_u64(record.meta.ns.region_id);
168
169    Entry::Naive(NaiveEntry {
170        provider: Provider::Kafka(provider),
171        region_id,
172        // TODO(weny): should be the offset in the topic
173        entry_id: record.meta.entry_id,
174        data: record.data,
175    })
176}
177
178fn convert_to_multiple_entry(
179    provider: Arc<KafkaProvider>,
180    region_id: RegionId,
181    records: Vec<Record>,
182) -> Entry {
183    let mut headers = Vec::with_capacity(records.len());
184    let mut parts = Vec::with_capacity(records.len());
185
186    for record in records {
187        let header = match record.meta.tp {
188            RecordType::Full => unreachable!(),
189            RecordType::First => MultiplePartHeader::First,
190            RecordType::Middle(i) => MultiplePartHeader::Middle(i),
191            RecordType::Last => MultiplePartHeader::Last,
192        };
193        headers.push(header);
194        parts.push(record.data);
195    }
196
197    Entry::MultiplePart(MultiplePartEntry {
198        provider: Provider::Kafka(provider),
199        region_id,
200        // TODO(weny): should be the offset in the topic
201        entry_id: 0,
202        headers,
203        parts,
204    })
205}
206
207/// Constructs entries from `buffered_records`
208pub fn remaining_entries(
209    provider: &Arc<KafkaProvider>,
210    buffered_records: &mut HashMap<RegionId, Vec<Record>>,
211) -> Option<Vec<Entry>> {
212    if buffered_records.is_empty() {
213        None
214    } else {
215        let mut entries = Vec::with_capacity(buffered_records.len());
216        for (region_id, records) in buffered_records.drain() {
217            entries.push(convert_to_multiple_entry(
218                provider.clone(),
219                region_id,
220                records,
221            ));
222        }
223        Some(entries)
224    }
225}
226
227/// For type of [Entry::Naive] Entry:
228/// - Emits a [RecordType::Full] type record immediately.
229///
230/// For type of [Entry::MultiplePart] Entry:
231/// - Emits a complete or incomplete [Entry] while the next same [RegionId] record arrives.
232///
233/// **Incomplete Entry:**
234/// If the records arrive in the following order, it emits **the incomplete [Entry]** when the next record arrives.
235/// - **[RecordType::First], [RecordType::Middle]**, [RecordType::First]
236/// - **[RecordType::Middle]**, [RecordType::First]
237/// - **[RecordType::Last]**
238pub(crate) fn maybe_emit_entry(
239    provider: &Arc<KafkaProvider>,
240    record: Record,
241    buffered_records: &mut HashMap<RegionId, Vec<Record>>,
242) -> Result<Option<Entry>> {
243    let mut entry = None;
244    match record.meta.tp {
245        RecordType::Full => entry = Some(convert_to_naive_entry(provider.clone(), record)),
246        RecordType::First => {
247            let region_id = record.meta.ns.region_id.into();
248            if let Some(records) = buffered_records.insert(region_id, vec![record]) {
249                // Incomplete entry
250                entry = Some(convert_to_multiple_entry(
251                    provider.clone(),
252                    region_id,
253                    records,
254                ))
255            }
256        }
257        RecordType::Middle(seq) => {
258            let region_id = record.meta.ns.region_id.into();
259            let records = buffered_records.entry(region_id).or_default();
260
261            // Only validate complete entries.
262            if !records.is_empty() {
263                // Safety: the records are guaranteed not empty if the key exists.
264                let last_record = records.last().unwrap();
265                let legal = match last_record.meta.tp {
266                    // Legal if this record follows a First record.
267                    RecordType::First => seq == 1,
268                    // Legal if this record follows a Middle record just prior to this record.
269                    RecordType::Middle(last_seq) => last_seq + 1 == seq,
270                    // Illegal sequence.
271                    _ => false,
272                };
273                ensure!(
274                    legal,
275                    IllegalSequenceSnafu {
276                        error: format!(
277                            "Illegal sequence of a middle record, last record: {:?}, incoming record: {:?}",
278                            last_record.meta.tp,
279                            record.meta.tp
280                        )
281                    }
282                );
283            }
284
285            records.push(record);
286        }
287        RecordType::Last => {
288            let region_id = record.meta.ns.region_id.into();
289            if let Some(mut records) = buffered_records.remove(&region_id) {
290                records.push(record);
291                entry = Some(convert_to_multiple_entry(
292                    provider.clone(),
293                    region_id,
294                    records,
295                ))
296            } else {
297                // Incomplete entry
298                entry = Some(convert_to_multiple_entry(
299                    provider.clone(),
300                    region_id,
301                    vec![record],
302                ))
303            }
304        }
305    }
306    Ok(entry)
307}
308
309#[cfg(test)]
310mod tests {
311    use std::assert_matches::assert_matches;
312    use std::sync::Arc;
313
314    use super::*;
315    use crate::error;
316
317    fn new_test_record(tp: RecordType, entry_id: EntryId, region_id: u64, data: Vec<u8>) -> Record {
318        Record {
319            meta: RecordMeta {
320                version: VERSION,
321                tp,
322                ns: NamespaceImpl {
323                    region_id,
324                    topic: "greptimedb_wal_topic".to_string(),
325                },
326                entry_id,
327            },
328            data,
329        }
330    }
331
332    #[test]
333    fn test_maybe_emit_entry_emit_naive_entry() {
334        let provider = Arc::new(KafkaProvider::new("my_topic".to_string()));
335        let region_id = RegionId::new(1, 1);
336        let mut buffer = HashMap::new();
337        let record = new_test_record(RecordType::Full, 1, region_id.as_u64(), vec![1; 100]);
338        let entry = maybe_emit_entry(&provider, record, &mut buffer)
339            .unwrap()
340            .unwrap();
341        assert_eq!(
342            entry,
343            Entry::Naive(NaiveEntry {
344                provider: Provider::Kafka(provider),
345                region_id,
346                entry_id: 1,
347                data: vec![1; 100]
348            })
349        );
350    }
351
352    #[test]
353    fn test_maybe_emit_entry_emit_incomplete_entry() {
354        let provider = Arc::new(KafkaProvider::new("my_topic".to_string()));
355        let region_id = RegionId::new(1, 1);
356        // `First` overwrite `First`
357        let mut buffer = HashMap::new();
358        let record = new_test_record(RecordType::First, 1, region_id.as_u64(), vec![1; 100]);
359        assert!(maybe_emit_entry(&provider, record, &mut buffer)
360            .unwrap()
361            .is_none());
362        let record = new_test_record(RecordType::First, 2, region_id.as_u64(), vec![2; 100]);
363        let incomplete_entry = maybe_emit_entry(&provider, record, &mut buffer)
364            .unwrap()
365            .unwrap();
366
367        assert_eq!(
368            incomplete_entry,
369            Entry::MultiplePart(MultiplePartEntry {
370                provider: Provider::Kafka(provider.clone()),
371                region_id,
372                // TODO(weny): always be 0.
373                entry_id: 0,
374                headers: vec![MultiplePartHeader::First],
375                parts: vec![vec![1; 100]],
376            })
377        );
378
379        // `Last` overwrite `None`
380        let mut buffer = HashMap::new();
381        let record = new_test_record(RecordType::Last, 1, region_id.as_u64(), vec![1; 100]);
382        let incomplete_entry = maybe_emit_entry(&provider, record, &mut buffer)
383            .unwrap()
384            .unwrap();
385
386        assert_eq!(
387            incomplete_entry,
388            Entry::MultiplePart(MultiplePartEntry {
389                provider: Provider::Kafka(provider.clone()),
390                region_id,
391                // TODO(weny): always be 0.
392                entry_id: 0,
393                headers: vec![MultiplePartHeader::Last],
394                parts: vec![vec![1; 100]],
395            })
396        );
397
398        // `First` overwrite `Middle(0)`
399        let mut buffer = HashMap::new();
400        let record = new_test_record(RecordType::Middle(0), 1, region_id.as_u64(), vec![1; 100]);
401        assert!(maybe_emit_entry(&provider, record, &mut buffer)
402            .unwrap()
403            .is_none());
404        let record = new_test_record(RecordType::First, 2, region_id.as_u64(), vec![2; 100]);
405        let incomplete_entry = maybe_emit_entry(&provider, record, &mut buffer)
406            .unwrap()
407            .unwrap();
408
409        assert_eq!(
410            incomplete_entry,
411            Entry::MultiplePart(MultiplePartEntry {
412                provider: Provider::Kafka(provider),
413                region_id,
414                // TODO(weny): always be 0.
415                entry_id: 0,
416                headers: vec![MultiplePartHeader::Middle(0)],
417                parts: vec![vec![1; 100]],
418            })
419        );
420    }
421
422    #[test]
423    fn test_maybe_emit_entry_illegal_seq() {
424        let provider = Arc::new(KafkaProvider::new("my_topic".to_string()));
425        let region_id = RegionId::new(1, 1);
426        let mut buffer = HashMap::new();
427        let record = new_test_record(RecordType::First, 1, region_id.as_u64(), vec![1; 100]);
428        assert!(maybe_emit_entry(&provider, record, &mut buffer)
429            .unwrap()
430            .is_none());
431        let record = new_test_record(RecordType::Middle(2), 1, region_id.as_u64(), vec![2; 100]);
432        let err = maybe_emit_entry(&provider, record, &mut buffer).unwrap_err();
433        assert_matches!(err, error::Error::IllegalSequence { .. });
434
435        let mut buffer = HashMap::new();
436        let record = new_test_record(RecordType::First, 1, region_id.as_u64(), vec![1; 100]);
437        assert!(maybe_emit_entry(&provider, record, &mut buffer)
438            .unwrap()
439            .is_none());
440        let record = new_test_record(RecordType::Middle(1), 1, region_id.as_u64(), vec![2; 100]);
441        assert!(maybe_emit_entry(&provider, record, &mut buffer)
442            .unwrap()
443            .is_none());
444        let record = new_test_record(RecordType::Middle(3), 1, region_id.as_u64(), vec![2; 100]);
445        let err = maybe_emit_entry(&provider, record, &mut buffer).unwrap_err();
446        assert_matches!(err, error::Error::IllegalSequence { .. });
447    }
448
449    #[test]
450    fn test_meta_size() {
451        let meta = RecordMeta {
452            version: VERSION,
453            tp: RecordType::Middle(usize::MAX),
454            entry_id: u64::MAX,
455            ns: NamespaceImpl {
456                region_id: RegionId::new(u32::MAX, u32::MAX).as_u64(),
457                topic: format!("greptime_kafka_cluster/1024/2048/{}", uuid::Uuid::new_v4()),
458            },
459        };
460        let serialized = serde_json::to_vec(&meta).unwrap();
461        // The len of serialized data is 202.
462        assert!(serialized.len() < ESTIMATED_META_SIZE);
463    }
464}