1use std::collections::HashMap;
16use std::sync::Arc;
17
18use rskafka::record::Record as KafkaRecord;
19use serde::{Deserialize, Serialize};
20use snafu::{ensure, OptionExt, ResultExt};
21use store_api::logstore::entry::{Entry, MultiplePartEntry, MultiplePartHeader, NaiveEntry};
22use store_api::logstore::provider::{KafkaProvider, Provider};
23use store_api::storage::RegionId;
24
25use crate::error::{
26 DecodeJsonSnafu, EncodeJsonSnafu, IllegalSequenceSnafu, MetaLengthExceededLimitSnafu,
27 MissingKeySnafu, MissingValueSnafu, Result,
28};
29use crate::kafka::{EntryId, NamespaceImpl};
30
31pub(crate) const VERSION: u32 = 0;
33
34pub(crate) const ESTIMATED_META_SIZE: usize = 256;
37
38#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
47pub enum RecordType {
48 Full,
50 First,
52 Middle(usize),
55 Last,
57}
58
59#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
61pub struct RecordMeta {
62 version: u32,
64 pub tp: RecordType,
66 pub entry_id: EntryId,
68 pub ns: NamespaceImpl,
70}
71
72#[derive(Debug, Clone, PartialEq)]
82pub(crate) struct Record {
83 pub(crate) meta: RecordMeta,
85 data: Vec<u8>,
87}
88
89impl TryFrom<Record> for KafkaRecord {
90 type Error = crate::error::Error;
91
92 fn try_from(record: Record) -> Result<Self> {
93 let key = serde_json::to_vec(&record.meta).context(EncodeJsonSnafu)?;
94 ensure!(
95 key.len() < ESTIMATED_META_SIZE,
96 MetaLengthExceededLimitSnafu {
97 limit: ESTIMATED_META_SIZE,
98 actual: key.len()
99 }
100 );
101 Ok(KafkaRecord {
102 key: Some(key),
103 value: Some(record.data),
104 timestamp: chrono::Utc::now(),
105 headers: Default::default(),
106 })
107 }
108}
109
110impl TryFrom<KafkaRecord> for Record {
112 type Error = crate::error::Error;
113
114 fn try_from(kafka_record: KafkaRecord) -> Result<Self> {
115 let key = kafka_record.key.context(MissingKeySnafu)?;
116 let meta = serde_json::from_slice(&key).context(DecodeJsonSnafu)?;
117 let data = kafka_record.value.context(MissingValueSnafu)?;
118 Ok(Self { meta, data })
119 }
120}
121
122pub(crate) fn convert_to_kafka_records(entry: Entry) -> Result<Vec<KafkaRecord>> {
123 match entry {
124 Entry::Naive(entry) => Ok(vec![KafkaRecord::try_from(Record {
125 meta: RecordMeta {
126 version: VERSION,
127 tp: RecordType::Full,
128 entry_id: 0,
130 ns: NamespaceImpl {
131 region_id: entry.region_id.as_u64(),
132 topic: String::new(),
134 },
135 },
136 data: entry.data,
137 })?]),
138 Entry::MultiplePart(entry) => {
139 let mut entries = Vec::with_capacity(entry.parts.len());
140
141 for (idx, part) in entry.parts.into_iter().enumerate() {
142 let tp = match entry.headers[idx] {
143 MultiplePartHeader::First => RecordType::First,
144 MultiplePartHeader::Middle(i) => RecordType::Middle(i),
145 MultiplePartHeader::Last => RecordType::Last,
146 };
147 entries.push(KafkaRecord::try_from(Record {
148 meta: RecordMeta {
149 version: VERSION,
150 tp,
151 entry_id: 0,
153 ns: NamespaceImpl {
154 region_id: entry.region_id.as_u64(),
155 topic: String::new(),
156 },
157 },
158 data: part,
159 })?)
160 }
161 Ok(entries)
162 }
163 }
164}
165
166fn convert_to_naive_entry(provider: Arc<KafkaProvider>, record: Record) -> Entry {
167 let region_id = RegionId::from_u64(record.meta.ns.region_id);
168
169 Entry::Naive(NaiveEntry {
170 provider: Provider::Kafka(provider),
171 region_id,
172 entry_id: record.meta.entry_id,
173 data: record.data,
174 })
175}
176
177fn convert_to_multiple_entry(
178 provider: Arc<KafkaProvider>,
179 region_id: RegionId,
180 records: Vec<Record>,
181) -> Entry {
182 let mut headers = Vec::with_capacity(records.len());
183 let mut parts = Vec::with_capacity(records.len());
184 let entry_id = records.last().map(|r| r.meta.entry_id).unwrap_or_default();
185
186 for record in records {
187 let header = match record.meta.tp {
188 RecordType::Full => unreachable!(),
189 RecordType::First => MultiplePartHeader::First,
190 RecordType::Middle(i) => MultiplePartHeader::Middle(i),
191 RecordType::Last => MultiplePartHeader::Last,
192 };
193 headers.push(header);
194 parts.push(record.data);
195 }
196
197 Entry::MultiplePart(MultiplePartEntry {
198 provider: Provider::Kafka(provider),
199 region_id,
200 entry_id,
201 headers,
202 parts,
203 })
204}
205
206pub fn remaining_entries(
208 provider: &Arc<KafkaProvider>,
209 buffered_records: &mut HashMap<RegionId, Vec<Record>>,
210) -> Option<Vec<Entry>> {
211 if buffered_records.is_empty() {
212 None
213 } else {
214 let mut entries = Vec::with_capacity(buffered_records.len());
215 for (region_id, records) in buffered_records.drain() {
216 entries.push(convert_to_multiple_entry(
217 provider.clone(),
218 region_id,
219 records,
220 ));
221 }
222 Some(entries)
223 }
224}
225
226pub(crate) fn maybe_emit_entry(
238 provider: &Arc<KafkaProvider>,
239 record: Record,
240 buffered_records: &mut HashMap<RegionId, Vec<Record>>,
241) -> Result<Option<Entry>> {
242 let mut entry = None;
243 match record.meta.tp {
244 RecordType::Full => entry = Some(convert_to_naive_entry(provider.clone(), record)),
245 RecordType::First => {
246 let region_id = record.meta.ns.region_id.into();
247 if let Some(records) = buffered_records.insert(region_id, vec![record]) {
248 entry = Some(convert_to_multiple_entry(
250 provider.clone(),
251 region_id,
252 records,
253 ))
254 }
255 }
256 RecordType::Middle(seq) => {
257 let region_id = record.meta.ns.region_id.into();
258 let records = buffered_records.entry(region_id).or_default();
259
260 if !records.is_empty() {
262 let last_record = records.last().unwrap();
264 let legal = match last_record.meta.tp {
265 RecordType::First => seq == 1,
267 RecordType::Middle(last_seq) => last_seq + 1 == seq,
269 _ => false,
271 };
272 ensure!(
273 legal,
274 IllegalSequenceSnafu {
275 error: format!(
276 "Illegal sequence of a middle record, last record: {:?}, incoming record: {:?}",
277 last_record.meta.tp,
278 record.meta.tp
279 )
280 }
281 );
282 }
283
284 records.push(record);
285 }
286 RecordType::Last => {
287 let region_id = record.meta.ns.region_id.into();
288 if let Some(mut records) = buffered_records.remove(®ion_id) {
289 records.push(record);
290 entry = Some(convert_to_multiple_entry(
291 provider.clone(),
292 region_id,
293 records,
294 ))
295 } else {
296 entry = Some(convert_to_multiple_entry(
298 provider.clone(),
299 region_id,
300 vec![record],
301 ))
302 }
303 }
304 }
305 Ok(entry)
306}
307
308#[cfg(test)]
309mod tests {
310 use std::assert_matches::assert_matches;
311 use std::sync::Arc;
312
313 use super::*;
314 use crate::error;
315
316 fn new_test_record(tp: RecordType, entry_id: EntryId, region_id: u64, data: Vec<u8>) -> Record {
317 Record {
318 meta: RecordMeta {
319 version: VERSION,
320 tp,
321 ns: NamespaceImpl {
322 region_id,
323 topic: "greptimedb_wal_topic".to_string(),
324 },
325 entry_id,
326 },
327 data,
328 }
329 }
330
331 #[test]
332 fn test_maybe_emit_entry_emit_naive_entry() {
333 let provider = Arc::new(KafkaProvider::new("my_topic".to_string()));
334 let region_id = RegionId::new(1, 1);
335 let mut buffer = HashMap::new();
336 let record = new_test_record(RecordType::Full, 1, region_id.as_u64(), vec![1; 100]);
337 let entry = maybe_emit_entry(&provider, record, &mut buffer)
338 .unwrap()
339 .unwrap();
340 assert_eq!(
341 entry,
342 Entry::Naive(NaiveEntry {
343 provider: Provider::Kafka(provider),
344 region_id,
345 entry_id: 1,
346 data: vec![1; 100]
347 })
348 );
349 }
350
351 #[test]
352 fn test_maybe_emit_entry_emit_incomplete_entry() {
353 let provider = Arc::new(KafkaProvider::new("my_topic".to_string()));
354 let region_id = RegionId::new(1, 1);
355 let mut buffer = HashMap::new();
357 let record = new_test_record(RecordType::First, 1, region_id.as_u64(), vec![1; 100]);
358 assert!(maybe_emit_entry(&provider, record, &mut buffer)
359 .unwrap()
360 .is_none());
361 let record = new_test_record(RecordType::First, 2, region_id.as_u64(), vec![2; 100]);
362 let incomplete_entry = maybe_emit_entry(&provider, record, &mut buffer)
363 .unwrap()
364 .unwrap();
365
366 assert_eq!(
367 incomplete_entry,
368 Entry::MultiplePart(MultiplePartEntry {
369 provider: Provider::Kafka(provider.clone()),
370 region_id,
371 entry_id: 1,
372 headers: vec![MultiplePartHeader::First],
373 parts: vec![vec![1; 100]],
374 })
375 );
376
377 let mut buffer = HashMap::new();
379 let record = new_test_record(RecordType::Last, 1, region_id.as_u64(), vec![1; 100]);
380 let incomplete_entry = maybe_emit_entry(&provider, record, &mut buffer)
381 .unwrap()
382 .unwrap();
383
384 assert_eq!(
385 incomplete_entry,
386 Entry::MultiplePart(MultiplePartEntry {
387 provider: Provider::Kafka(provider.clone()),
388 region_id,
389 entry_id: 1,
390 headers: vec![MultiplePartHeader::Last],
391 parts: vec![vec![1; 100]],
392 })
393 );
394
395 let mut buffer = HashMap::new();
397 let record = new_test_record(RecordType::Middle(0), 1, region_id.as_u64(), vec![1; 100]);
398 assert!(maybe_emit_entry(&provider, record, &mut buffer)
399 .unwrap()
400 .is_none());
401 let record = new_test_record(RecordType::First, 2, region_id.as_u64(), vec![2; 100]);
402 let incomplete_entry = maybe_emit_entry(&provider, record, &mut buffer)
403 .unwrap()
404 .unwrap();
405
406 assert_eq!(
407 incomplete_entry,
408 Entry::MultiplePart(MultiplePartEntry {
409 provider: Provider::Kafka(provider),
410 region_id,
411 entry_id: 1,
412 headers: vec![MultiplePartHeader::Middle(0)],
413 parts: vec![vec![1; 100]],
414 })
415 );
416 }
417
418 #[test]
419 fn test_maybe_emit_entry_illegal_seq() {
420 let provider = Arc::new(KafkaProvider::new("my_topic".to_string()));
421 let region_id = RegionId::new(1, 1);
422 let mut buffer = HashMap::new();
423 let record = new_test_record(RecordType::First, 1, region_id.as_u64(), vec![1; 100]);
424 assert!(maybe_emit_entry(&provider, record, &mut buffer)
425 .unwrap()
426 .is_none());
427 let record = new_test_record(RecordType::Middle(2), 1, region_id.as_u64(), vec![2; 100]);
428 let err = maybe_emit_entry(&provider, record, &mut buffer).unwrap_err();
429 assert_matches!(err, error::Error::IllegalSequence { .. });
430
431 let mut buffer = HashMap::new();
432 let record = new_test_record(RecordType::First, 1, region_id.as_u64(), vec![1; 100]);
433 assert!(maybe_emit_entry(&provider, record, &mut buffer)
434 .unwrap()
435 .is_none());
436 let record = new_test_record(RecordType::Middle(1), 1, region_id.as_u64(), vec![2; 100]);
437 assert!(maybe_emit_entry(&provider, record, &mut buffer)
438 .unwrap()
439 .is_none());
440 let record = new_test_record(RecordType::Middle(3), 1, region_id.as_u64(), vec![2; 100]);
441 let err = maybe_emit_entry(&provider, record, &mut buffer).unwrap_err();
442 assert_matches!(err, error::Error::IllegalSequence { .. });
443 }
444
445 #[test]
446 fn test_meta_size() {
447 let meta = RecordMeta {
448 version: VERSION,
449 tp: RecordType::Middle(usize::MAX),
450 entry_id: u64::MAX,
451 ns: NamespaceImpl {
452 region_id: RegionId::new(u32::MAX, u32::MAX).as_u64(),
453 topic: format!("greptime_kafka_cluster/1024/2048/{}", uuid::Uuid::new_v4()),
454 },
455 };
456 let serialized = serde_json::to_vec(&meta).unwrap();
457 assert!(serialized.len() < ESTIMATED_META_SIZE);
459 }
460}