1use std::collections::HashMap;
16use std::sync::Arc;
17
18use rskafka::record::Record as KafkaRecord;
19use serde::{Deserialize, Serialize};
20use snafu::{OptionExt, ResultExt, ensure};
21use store_api::logstore::entry::{Entry, MultiplePartEntry, MultiplePartHeader, NaiveEntry};
22use store_api::logstore::provider::{KafkaProvider, Provider};
23use store_api::storage::RegionId;
24
25use crate::error::{
26 DecodeJsonSnafu, EncodeJsonSnafu, IllegalSequenceSnafu, MetaLengthExceededLimitSnafu,
27 MissingKeySnafu, MissingValueSnafu, Result,
28};
29use crate::kafka::{EntryId, NamespaceImpl};
30
31pub(crate) const VERSION: u32 = 0;
33
34pub(crate) const ESTIMATED_META_SIZE: usize = 256;
37
38#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
47pub enum RecordType {
48 Full,
50 First,
52 Middle(usize),
55 Last,
57}
58
59#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
61pub struct RecordMeta {
62 version: u32,
64 pub tp: RecordType,
66 pub entry_id: EntryId,
68 pub ns: NamespaceImpl,
70}
71
72#[derive(Debug, Clone, PartialEq)]
82pub(crate) struct Record {
83 pub(crate) meta: RecordMeta,
85 data: Vec<u8>,
87}
88
89impl TryFrom<Record> for KafkaRecord {
90 type Error = crate::error::Error;
91
92 fn try_from(record: Record) -> Result<Self> {
93 let key = serde_json::to_vec(&record.meta).context(EncodeJsonSnafu)?;
94 ensure!(
95 key.len() < ESTIMATED_META_SIZE,
96 MetaLengthExceededLimitSnafu {
97 limit: ESTIMATED_META_SIZE,
98 actual: key.len()
99 }
100 );
101 Ok(KafkaRecord {
102 key: Some(key),
103 value: Some(record.data),
104 timestamp: chrono::Utc::now(),
105 headers: Default::default(),
106 })
107 }
108}
109
110impl TryFrom<KafkaRecord> for Record {
112 type Error = crate::error::Error;
113
114 fn try_from(kafka_record: KafkaRecord) -> Result<Self> {
115 let key = kafka_record.key.context(MissingKeySnafu)?;
116 let meta = serde_json::from_slice(&key).context(DecodeJsonSnafu)?;
117 let data = kafka_record.value.context(MissingValueSnafu)?;
118 Ok(Self { meta, data })
119 }
120}
121
122pub(crate) fn convert_to_kafka_records(entry: Entry) -> Result<Vec<KafkaRecord>> {
123 match entry {
124 Entry::Naive(entry) => Ok(vec![KafkaRecord::try_from(Record {
125 meta: RecordMeta {
126 version: VERSION,
127 tp: RecordType::Full,
128 entry_id: 0,
130 ns: NamespaceImpl {
131 region_id: entry.region_id.as_u64(),
132 topic: String::new(),
134 },
135 },
136 data: entry.data,
137 })?]),
138 Entry::MultiplePart(entry) => {
139 let mut entries = Vec::with_capacity(entry.parts.len());
140
141 for (idx, part) in entry.parts.into_iter().enumerate() {
142 let tp = match entry.headers[idx] {
143 MultiplePartHeader::First => RecordType::First,
144 MultiplePartHeader::Middle(i) => RecordType::Middle(i),
145 MultiplePartHeader::Last => RecordType::Last,
146 };
147 entries.push(KafkaRecord::try_from(Record {
148 meta: RecordMeta {
149 version: VERSION,
150 tp,
151 entry_id: 0,
153 ns: NamespaceImpl {
154 region_id: entry.region_id.as_u64(),
155 topic: String::new(),
156 },
157 },
158 data: part,
159 })?)
160 }
161 Ok(entries)
162 }
163 }
164}
165
166fn convert_to_naive_entry(provider: Arc<KafkaProvider>, record: Record) -> Entry {
167 let region_id = RegionId::from_u64(record.meta.ns.region_id);
168
169 Entry::Naive(NaiveEntry {
170 provider: Provider::Kafka(provider),
171 region_id,
172 entry_id: record.meta.entry_id,
173 data: record.data,
174 })
175}
176
177fn convert_to_multiple_entry(
178 provider: Arc<KafkaProvider>,
179 region_id: RegionId,
180 records: Vec<Record>,
181) -> Entry {
182 let mut headers = Vec::with_capacity(records.len());
183 let mut parts = Vec::with_capacity(records.len());
184 let entry_id = records.last().map(|r| r.meta.entry_id).unwrap_or_default();
185
186 for record in records {
187 let header = match record.meta.tp {
188 RecordType::Full => unreachable!(),
189 RecordType::First => MultiplePartHeader::First,
190 RecordType::Middle(i) => MultiplePartHeader::Middle(i),
191 RecordType::Last => MultiplePartHeader::Last,
192 };
193 headers.push(header);
194 parts.push(record.data);
195 }
196
197 Entry::MultiplePart(MultiplePartEntry {
198 provider: Provider::Kafka(provider),
199 region_id,
200 entry_id,
201 headers,
202 parts,
203 })
204}
205
206pub fn remaining_entries(
208 provider: &Arc<KafkaProvider>,
209 buffered_records: &mut HashMap<RegionId, Vec<Record>>,
210) -> Option<Vec<Entry>> {
211 if buffered_records.is_empty() {
212 None
213 } else {
214 let mut entries = Vec::with_capacity(buffered_records.len());
215 for (region_id, records) in buffered_records.drain() {
216 entries.push(convert_to_multiple_entry(
217 provider.clone(),
218 region_id,
219 records,
220 ));
221 }
222 Some(entries)
223 }
224}
225
226pub(crate) fn maybe_emit_entry(
238 provider: &Arc<KafkaProvider>,
239 record: Record,
240 buffered_records: &mut HashMap<RegionId, Vec<Record>>,
241) -> Result<Option<Entry>> {
242 let mut entry = None;
243 match record.meta.tp {
244 RecordType::Full => entry = Some(convert_to_naive_entry(provider.clone(), record)),
245 RecordType::First => {
246 let region_id = record.meta.ns.region_id.into();
247 if let Some(records) = buffered_records.insert(region_id, vec![record]) {
248 entry = Some(convert_to_multiple_entry(
250 provider.clone(),
251 region_id,
252 records,
253 ))
254 }
255 }
256 RecordType::Middle(seq) => {
257 let region_id = record.meta.ns.region_id.into();
258 let records = buffered_records.entry(region_id).or_default();
259
260 if !records.is_empty() {
262 let last_record = records.last().unwrap();
264 let legal = match last_record.meta.tp {
265 RecordType::First => seq == 1,
267 RecordType::Middle(last_seq) => last_seq + 1 == seq,
269 _ => false,
271 };
272 ensure!(
273 legal,
274 IllegalSequenceSnafu {
275 error: format!(
276 "Illegal sequence of a middle record, last record: {:?}, incoming record: {:?}",
277 last_record.meta.tp, record.meta.tp
278 )
279 }
280 );
281 }
282
283 records.push(record);
284 }
285 RecordType::Last => {
286 let region_id = record.meta.ns.region_id.into();
287 if let Some(mut records) = buffered_records.remove(®ion_id) {
288 records.push(record);
289 entry = Some(convert_to_multiple_entry(
290 provider.clone(),
291 region_id,
292 records,
293 ))
294 } else {
295 entry = Some(convert_to_multiple_entry(
297 provider.clone(),
298 region_id,
299 vec![record],
300 ))
301 }
302 }
303 }
304 Ok(entry)
305}
306
307#[cfg(test)]
308mod tests {
309 use std::assert_matches::assert_matches;
310 use std::sync::Arc;
311
312 use super::*;
313 use crate::error;
314
315 fn new_test_record(tp: RecordType, entry_id: EntryId, region_id: u64, data: Vec<u8>) -> Record {
316 Record {
317 meta: RecordMeta {
318 version: VERSION,
319 tp,
320 ns: NamespaceImpl {
321 region_id,
322 topic: "greptimedb_wal_topic".to_string(),
323 },
324 entry_id,
325 },
326 data,
327 }
328 }
329
330 #[test]
331 fn test_maybe_emit_entry_emit_naive_entry() {
332 let provider = Arc::new(KafkaProvider::new("my_topic".to_string()));
333 let region_id = RegionId::new(1, 1);
334 let mut buffer = HashMap::new();
335 let record = new_test_record(RecordType::Full, 1, region_id.as_u64(), vec![1; 100]);
336 let entry = maybe_emit_entry(&provider, record, &mut buffer)
337 .unwrap()
338 .unwrap();
339 assert_eq!(
340 entry,
341 Entry::Naive(NaiveEntry {
342 provider: Provider::Kafka(provider),
343 region_id,
344 entry_id: 1,
345 data: vec![1; 100]
346 })
347 );
348 }
349
350 #[test]
351 fn test_maybe_emit_entry_emit_incomplete_entry() {
352 let provider = Arc::new(KafkaProvider::new("my_topic".to_string()));
353 let region_id = RegionId::new(1, 1);
354 let mut buffer = HashMap::new();
356 let record = new_test_record(RecordType::First, 1, region_id.as_u64(), vec![1; 100]);
357 assert!(
358 maybe_emit_entry(&provider, record, &mut buffer)
359 .unwrap()
360 .is_none()
361 );
362 let record = new_test_record(RecordType::First, 2, region_id.as_u64(), vec![2; 100]);
363 let incomplete_entry = maybe_emit_entry(&provider, record, &mut buffer)
364 .unwrap()
365 .unwrap();
366
367 assert_eq!(
368 incomplete_entry,
369 Entry::MultiplePart(MultiplePartEntry {
370 provider: Provider::Kafka(provider.clone()),
371 region_id,
372 entry_id: 1,
373 headers: vec![MultiplePartHeader::First],
374 parts: vec![vec![1; 100]],
375 })
376 );
377
378 let mut buffer = HashMap::new();
380 let record = new_test_record(RecordType::Last, 1, region_id.as_u64(), vec![1; 100]);
381 let incomplete_entry = maybe_emit_entry(&provider, record, &mut buffer)
382 .unwrap()
383 .unwrap();
384
385 assert_eq!(
386 incomplete_entry,
387 Entry::MultiplePart(MultiplePartEntry {
388 provider: Provider::Kafka(provider.clone()),
389 region_id,
390 entry_id: 1,
391 headers: vec![MultiplePartHeader::Last],
392 parts: vec![vec![1; 100]],
393 })
394 );
395
396 let mut buffer = HashMap::new();
398 let record = new_test_record(RecordType::Middle(0), 1, region_id.as_u64(), vec![1; 100]);
399 assert!(
400 maybe_emit_entry(&provider, record, &mut buffer)
401 .unwrap()
402 .is_none()
403 );
404 let record = new_test_record(RecordType::First, 2, region_id.as_u64(), vec![2; 100]);
405 let incomplete_entry = maybe_emit_entry(&provider, record, &mut buffer)
406 .unwrap()
407 .unwrap();
408
409 assert_eq!(
410 incomplete_entry,
411 Entry::MultiplePart(MultiplePartEntry {
412 provider: Provider::Kafka(provider),
413 region_id,
414 entry_id: 1,
415 headers: vec![MultiplePartHeader::Middle(0)],
416 parts: vec![vec![1; 100]],
417 })
418 );
419 }
420
421 #[test]
422 fn test_maybe_emit_entry_illegal_seq() {
423 let provider = Arc::new(KafkaProvider::new("my_topic".to_string()));
424 let region_id = RegionId::new(1, 1);
425 let mut buffer = HashMap::new();
426 let record = new_test_record(RecordType::First, 1, region_id.as_u64(), vec![1; 100]);
427 assert!(
428 maybe_emit_entry(&provider, record, &mut buffer)
429 .unwrap()
430 .is_none()
431 );
432 let record = new_test_record(RecordType::Middle(2), 1, region_id.as_u64(), vec![2; 100]);
433 let err = maybe_emit_entry(&provider, record, &mut buffer).unwrap_err();
434 assert_matches!(err, error::Error::IllegalSequence { .. });
435
436 let mut buffer = HashMap::new();
437 let record = new_test_record(RecordType::First, 1, region_id.as_u64(), vec![1; 100]);
438 assert!(
439 maybe_emit_entry(&provider, record, &mut buffer)
440 .unwrap()
441 .is_none()
442 );
443 let record = new_test_record(RecordType::Middle(1), 1, region_id.as_u64(), vec![2; 100]);
444 assert!(
445 maybe_emit_entry(&provider, record, &mut buffer)
446 .unwrap()
447 .is_none()
448 );
449 let record = new_test_record(RecordType::Middle(3), 1, region_id.as_u64(), vec![2; 100]);
450 let err = maybe_emit_entry(&provider, record, &mut buffer).unwrap_err();
451 assert_matches!(err, error::Error::IllegalSequence { .. });
452 }
453
454 #[test]
455 fn test_meta_size() {
456 let meta = RecordMeta {
457 version: VERSION,
458 tp: RecordType::Middle(usize::MAX),
459 entry_id: u64::MAX,
460 ns: NamespaceImpl {
461 region_id: RegionId::new(u32::MAX, u32::MAX).as_u64(),
462 topic: format!("greptime_kafka_cluster/1024/2048/{}", uuid::Uuid::new_v4()),
463 },
464 };
465 let serialized = serde_json::to_vec(&meta).unwrap();
466 assert!(serialized.len() < ESTIMATED_META_SIZE);
468 }
469}