1use std::collections::HashMap;
16use std::sync::Arc;
17
18use rskafka::record::Record as KafkaRecord;
19use serde::{Deserialize, Serialize};
20use snafu::{ensure, OptionExt, ResultExt};
21use store_api::logstore::entry::{Entry, MultiplePartEntry, MultiplePartHeader, NaiveEntry};
22use store_api::logstore::provider::{KafkaProvider, Provider};
23use store_api::storage::RegionId;
24
25use crate::error::{
26 DecodeJsonSnafu, EncodeJsonSnafu, IllegalSequenceSnafu, MetaLengthExceededLimitSnafu,
27 MissingKeySnafu, MissingValueSnafu, Result,
28};
29use crate::kafka::{EntryId, NamespaceImpl};
30
31pub(crate) const VERSION: u32 = 0;
33
34pub(crate) const ESTIMATED_META_SIZE: usize = 256;
37
38#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
47pub enum RecordType {
48 Full,
50 First,
52 Middle(usize),
55 Last,
57}
58
59#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
61pub struct RecordMeta {
62 version: u32,
64 pub tp: RecordType,
66 pub entry_id: EntryId,
68 pub ns: NamespaceImpl,
70}
71
72#[derive(Debug, Clone, PartialEq)]
82pub(crate) struct Record {
83 pub(crate) meta: RecordMeta,
85 data: Vec<u8>,
87}
88
89impl TryFrom<Record> for KafkaRecord {
90 type Error = crate::error::Error;
91
92 fn try_from(record: Record) -> Result<Self> {
93 let key = serde_json::to_vec(&record.meta).context(EncodeJsonSnafu)?;
94 ensure!(
95 key.len() < ESTIMATED_META_SIZE,
96 MetaLengthExceededLimitSnafu {
97 limit: ESTIMATED_META_SIZE,
98 actual: key.len()
99 }
100 );
101 Ok(KafkaRecord {
102 key: Some(key),
103 value: Some(record.data),
104 timestamp: chrono::Utc::now(),
105 headers: Default::default(),
106 })
107 }
108}
109
110impl TryFrom<KafkaRecord> for Record {
112 type Error = crate::error::Error;
113
114 fn try_from(kafka_record: KafkaRecord) -> Result<Self> {
115 let key = kafka_record.key.context(MissingKeySnafu)?;
116 let meta = serde_json::from_slice(&key).context(DecodeJsonSnafu)?;
117 let data = kafka_record.value.context(MissingValueSnafu)?;
118 Ok(Self { meta, data })
119 }
120}
121
122pub(crate) fn convert_to_kafka_records(entry: Entry) -> Result<Vec<KafkaRecord>> {
123 match entry {
124 Entry::Naive(entry) => Ok(vec![KafkaRecord::try_from(Record {
125 meta: RecordMeta {
126 version: VERSION,
127 tp: RecordType::Full,
128 entry_id: 0,
130 ns: NamespaceImpl {
131 region_id: entry.region_id.as_u64(),
132 topic: String::new(),
134 },
135 },
136 data: entry.data,
137 })?]),
138 Entry::MultiplePart(entry) => {
139 let mut entries = Vec::with_capacity(entry.parts.len());
140
141 for (idx, part) in entry.parts.into_iter().enumerate() {
142 let tp = match entry.headers[idx] {
143 MultiplePartHeader::First => RecordType::First,
144 MultiplePartHeader::Middle(i) => RecordType::Middle(i),
145 MultiplePartHeader::Last => RecordType::Last,
146 };
147 entries.push(KafkaRecord::try_from(Record {
148 meta: RecordMeta {
149 version: VERSION,
150 tp,
151 entry_id: 0,
153 ns: NamespaceImpl {
154 region_id: entry.region_id.as_u64(),
155 topic: String::new(),
156 },
157 },
158 data: part,
159 })?)
160 }
161 Ok(entries)
162 }
163 }
164}
165
166fn convert_to_naive_entry(provider: Arc<KafkaProvider>, record: Record) -> Entry {
167 let region_id = RegionId::from_u64(record.meta.ns.region_id);
168
169 Entry::Naive(NaiveEntry {
170 provider: Provider::Kafka(provider),
171 region_id,
172 entry_id: record.meta.entry_id,
174 data: record.data,
175 })
176}
177
178fn convert_to_multiple_entry(
179 provider: Arc<KafkaProvider>,
180 region_id: RegionId,
181 records: Vec<Record>,
182) -> Entry {
183 let mut headers = Vec::with_capacity(records.len());
184 let mut parts = Vec::with_capacity(records.len());
185
186 for record in records {
187 let header = match record.meta.tp {
188 RecordType::Full => unreachable!(),
189 RecordType::First => MultiplePartHeader::First,
190 RecordType::Middle(i) => MultiplePartHeader::Middle(i),
191 RecordType::Last => MultiplePartHeader::Last,
192 };
193 headers.push(header);
194 parts.push(record.data);
195 }
196
197 Entry::MultiplePart(MultiplePartEntry {
198 provider: Provider::Kafka(provider),
199 region_id,
200 entry_id: 0,
202 headers,
203 parts,
204 })
205}
206
207pub fn remaining_entries(
209 provider: &Arc<KafkaProvider>,
210 buffered_records: &mut HashMap<RegionId, Vec<Record>>,
211) -> Option<Vec<Entry>> {
212 if buffered_records.is_empty() {
213 None
214 } else {
215 let mut entries = Vec::with_capacity(buffered_records.len());
216 for (region_id, records) in buffered_records.drain() {
217 entries.push(convert_to_multiple_entry(
218 provider.clone(),
219 region_id,
220 records,
221 ));
222 }
223 Some(entries)
224 }
225}
226
227pub(crate) fn maybe_emit_entry(
239 provider: &Arc<KafkaProvider>,
240 record: Record,
241 buffered_records: &mut HashMap<RegionId, Vec<Record>>,
242) -> Result<Option<Entry>> {
243 let mut entry = None;
244 match record.meta.tp {
245 RecordType::Full => entry = Some(convert_to_naive_entry(provider.clone(), record)),
246 RecordType::First => {
247 let region_id = record.meta.ns.region_id.into();
248 if let Some(records) = buffered_records.insert(region_id, vec![record]) {
249 entry = Some(convert_to_multiple_entry(
251 provider.clone(),
252 region_id,
253 records,
254 ))
255 }
256 }
257 RecordType::Middle(seq) => {
258 let region_id = record.meta.ns.region_id.into();
259 let records = buffered_records.entry(region_id).or_default();
260
261 if !records.is_empty() {
263 let last_record = records.last().unwrap();
265 let legal = match last_record.meta.tp {
266 RecordType::First => seq == 1,
268 RecordType::Middle(last_seq) => last_seq + 1 == seq,
270 _ => false,
272 };
273 ensure!(
274 legal,
275 IllegalSequenceSnafu {
276 error: format!(
277 "Illegal sequence of a middle record, last record: {:?}, incoming record: {:?}",
278 last_record.meta.tp,
279 record.meta.tp
280 )
281 }
282 );
283 }
284
285 records.push(record);
286 }
287 RecordType::Last => {
288 let region_id = record.meta.ns.region_id.into();
289 if let Some(mut records) = buffered_records.remove(®ion_id) {
290 records.push(record);
291 entry = Some(convert_to_multiple_entry(
292 provider.clone(),
293 region_id,
294 records,
295 ))
296 } else {
297 entry = Some(convert_to_multiple_entry(
299 provider.clone(),
300 region_id,
301 vec![record],
302 ))
303 }
304 }
305 }
306 Ok(entry)
307}
308
309#[cfg(test)]
310mod tests {
311 use std::assert_matches::assert_matches;
312 use std::sync::Arc;
313
314 use super::*;
315 use crate::error;
316
317 fn new_test_record(tp: RecordType, entry_id: EntryId, region_id: u64, data: Vec<u8>) -> Record {
318 Record {
319 meta: RecordMeta {
320 version: VERSION,
321 tp,
322 ns: NamespaceImpl {
323 region_id,
324 topic: "greptimedb_wal_topic".to_string(),
325 },
326 entry_id,
327 },
328 data,
329 }
330 }
331
332 #[test]
333 fn test_maybe_emit_entry_emit_naive_entry() {
334 let provider = Arc::new(KafkaProvider::new("my_topic".to_string()));
335 let region_id = RegionId::new(1, 1);
336 let mut buffer = HashMap::new();
337 let record = new_test_record(RecordType::Full, 1, region_id.as_u64(), vec![1; 100]);
338 let entry = maybe_emit_entry(&provider, record, &mut buffer)
339 .unwrap()
340 .unwrap();
341 assert_eq!(
342 entry,
343 Entry::Naive(NaiveEntry {
344 provider: Provider::Kafka(provider),
345 region_id,
346 entry_id: 1,
347 data: vec![1; 100]
348 })
349 );
350 }
351
352 #[test]
353 fn test_maybe_emit_entry_emit_incomplete_entry() {
354 let provider = Arc::new(KafkaProvider::new("my_topic".to_string()));
355 let region_id = RegionId::new(1, 1);
356 let mut buffer = HashMap::new();
358 let record = new_test_record(RecordType::First, 1, region_id.as_u64(), vec![1; 100]);
359 assert!(maybe_emit_entry(&provider, record, &mut buffer)
360 .unwrap()
361 .is_none());
362 let record = new_test_record(RecordType::First, 2, region_id.as_u64(), vec![2; 100]);
363 let incomplete_entry = maybe_emit_entry(&provider, record, &mut buffer)
364 .unwrap()
365 .unwrap();
366
367 assert_eq!(
368 incomplete_entry,
369 Entry::MultiplePart(MultiplePartEntry {
370 provider: Provider::Kafka(provider.clone()),
371 region_id,
372 entry_id: 0,
374 headers: vec![MultiplePartHeader::First],
375 parts: vec![vec![1; 100]],
376 })
377 );
378
379 let mut buffer = HashMap::new();
381 let record = new_test_record(RecordType::Last, 1, region_id.as_u64(), vec![1; 100]);
382 let incomplete_entry = maybe_emit_entry(&provider, record, &mut buffer)
383 .unwrap()
384 .unwrap();
385
386 assert_eq!(
387 incomplete_entry,
388 Entry::MultiplePart(MultiplePartEntry {
389 provider: Provider::Kafka(provider.clone()),
390 region_id,
391 entry_id: 0,
393 headers: vec![MultiplePartHeader::Last],
394 parts: vec![vec![1; 100]],
395 })
396 );
397
398 let mut buffer = HashMap::new();
400 let record = new_test_record(RecordType::Middle(0), 1, region_id.as_u64(), vec![1; 100]);
401 assert!(maybe_emit_entry(&provider, record, &mut buffer)
402 .unwrap()
403 .is_none());
404 let record = new_test_record(RecordType::First, 2, region_id.as_u64(), vec![2; 100]);
405 let incomplete_entry = maybe_emit_entry(&provider, record, &mut buffer)
406 .unwrap()
407 .unwrap();
408
409 assert_eq!(
410 incomplete_entry,
411 Entry::MultiplePart(MultiplePartEntry {
412 provider: Provider::Kafka(provider),
413 region_id,
414 entry_id: 0,
416 headers: vec![MultiplePartHeader::Middle(0)],
417 parts: vec![vec![1; 100]],
418 })
419 );
420 }
421
422 #[test]
423 fn test_maybe_emit_entry_illegal_seq() {
424 let provider = Arc::new(KafkaProvider::new("my_topic".to_string()));
425 let region_id = RegionId::new(1, 1);
426 let mut buffer = HashMap::new();
427 let record = new_test_record(RecordType::First, 1, region_id.as_u64(), vec![1; 100]);
428 assert!(maybe_emit_entry(&provider, record, &mut buffer)
429 .unwrap()
430 .is_none());
431 let record = new_test_record(RecordType::Middle(2), 1, region_id.as_u64(), vec![2; 100]);
432 let err = maybe_emit_entry(&provider, record, &mut buffer).unwrap_err();
433 assert_matches!(err, error::Error::IllegalSequence { .. });
434
435 let mut buffer = HashMap::new();
436 let record = new_test_record(RecordType::First, 1, region_id.as_u64(), vec![1; 100]);
437 assert!(maybe_emit_entry(&provider, record, &mut buffer)
438 .unwrap()
439 .is_none());
440 let record = new_test_record(RecordType::Middle(1), 1, region_id.as_u64(), vec![2; 100]);
441 assert!(maybe_emit_entry(&provider, record, &mut buffer)
442 .unwrap()
443 .is_none());
444 let record = new_test_record(RecordType::Middle(3), 1, region_id.as_u64(), vec![2; 100]);
445 let err = maybe_emit_entry(&provider, record, &mut buffer).unwrap_err();
446 assert_matches!(err, error::Error::IllegalSequence { .. });
447 }
448
449 #[test]
450 fn test_meta_size() {
451 let meta = RecordMeta {
452 version: VERSION,
453 tp: RecordType::Middle(usize::MAX),
454 entry_id: u64::MAX,
455 ns: NamespaceImpl {
456 region_id: RegionId::new(u32::MAX, u32::MAX).as_u64(),
457 topic: format!("greptime_kafka_cluster/1024/2048/{}", uuid::Uuid::new_v4()),
458 },
459 };
460 let serialized = serde_json::to_vec(&meta).unwrap();
461 assert!(serialized.len() < ESTIMATED_META_SIZE);
463 }
464}