use std::collections::HashMap;
use std::sync::Arc;
use rskafka::record::Record as KafkaRecord;
use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::logstore::entry::{Entry, MultiplePartEntry, MultiplePartHeader, NaiveEntry};
use store_api::logstore::provider::{KafkaProvider, Provider};
use store_api::storage::RegionId;
use crate::error::{
DecodeJsonSnafu, EncodeJsonSnafu, IllegalSequenceSnafu, MetaLengthExceededLimitSnafu,
MissingKeySnafu, MissingValueSnafu, Result,
};
use crate::kafka::{EntryId, NamespaceImpl};
pub(crate) const VERSION: u32 = 0;
pub(crate) const ESTIMATED_META_SIZE: usize = 256;
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
pub enum RecordType {
Full,
First,
Middle(usize),
Last,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct RecordMeta {
version: u32,
pub tp: RecordType,
pub entry_id: EntryId,
pub ns: NamespaceImpl,
}
#[derive(Debug, Clone, PartialEq)]
pub(crate) struct Record {
pub(crate) meta: RecordMeta,
data: Vec<u8>,
}
impl TryFrom<Record> for KafkaRecord {
type Error = crate::error::Error;
fn try_from(record: Record) -> Result<Self> {
let key = serde_json::to_vec(&record.meta).context(EncodeJsonSnafu)?;
ensure!(
key.len() < ESTIMATED_META_SIZE,
MetaLengthExceededLimitSnafu {
limit: ESTIMATED_META_SIZE,
actual: key.len()
}
);
Ok(KafkaRecord {
key: Some(key),
value: Some(record.data),
timestamp: chrono::Utc::now(),
headers: Default::default(),
})
}
}
impl TryFrom<KafkaRecord> for Record {
type Error = crate::error::Error;
fn try_from(kafka_record: KafkaRecord) -> Result<Self> {
let key = kafka_record.key.context(MissingKeySnafu)?;
let meta = serde_json::from_slice(&key).context(DecodeJsonSnafu)?;
let data = kafka_record.value.context(MissingValueSnafu)?;
Ok(Self { meta, data })
}
}
pub(crate) fn convert_to_kafka_records(entry: Entry) -> Result<Vec<KafkaRecord>> {
match entry {
Entry::Naive(entry) => Ok(vec![KafkaRecord::try_from(Record {
meta: RecordMeta {
version: VERSION,
tp: RecordType::Full,
entry_id: 0,
ns: NamespaceImpl {
region_id: entry.region_id.as_u64(),
topic: String::new(),
},
},
data: entry.data,
})?]),
Entry::MultiplePart(entry) => {
let mut entries = Vec::with_capacity(entry.parts.len());
for (idx, part) in entry.parts.into_iter().enumerate() {
let tp = match entry.headers[idx] {
MultiplePartHeader::First => RecordType::First,
MultiplePartHeader::Middle(i) => RecordType::Middle(i),
MultiplePartHeader::Last => RecordType::Last,
};
entries.push(KafkaRecord::try_from(Record {
meta: RecordMeta {
version: VERSION,
tp,
entry_id: 0,
ns: NamespaceImpl {
region_id: entry.region_id.as_u64(),
topic: String::new(),
},
},
data: part,
})?)
}
Ok(entries)
}
}
}
fn convert_to_naive_entry(provider: Arc<KafkaProvider>, record: Record) -> Entry {
let region_id = RegionId::from_u64(record.meta.ns.region_id);
Entry::Naive(NaiveEntry {
provider: Provider::Kafka(provider),
region_id,
entry_id: record.meta.entry_id,
data: record.data,
})
}
fn convert_to_multiple_entry(
provider: Arc<KafkaProvider>,
region_id: RegionId,
records: Vec<Record>,
) -> Entry {
let mut headers = Vec::with_capacity(records.len());
let mut parts = Vec::with_capacity(records.len());
for record in records {
let header = match record.meta.tp {
RecordType::Full => unreachable!(),
RecordType::First => MultiplePartHeader::First,
RecordType::Middle(i) => MultiplePartHeader::Middle(i),
RecordType::Last => MultiplePartHeader::Last,
};
headers.push(header);
parts.push(record.data);
}
Entry::MultiplePart(MultiplePartEntry {
provider: Provider::Kafka(provider),
region_id,
entry_id: 0,
headers,
parts,
})
}
pub fn remaining_entries(
provider: &Arc<KafkaProvider>,
buffered_records: &mut HashMap<RegionId, Vec<Record>>,
) -> Option<Vec<Entry>> {
if buffered_records.is_empty() {
None
} else {
let mut entries = Vec::with_capacity(buffered_records.len());
for (region_id, records) in buffered_records.drain() {
entries.push(convert_to_multiple_entry(
provider.clone(),
region_id,
records,
));
}
Some(entries)
}
}
pub(crate) fn maybe_emit_entry(
provider: &Arc<KafkaProvider>,
record: Record,
buffered_records: &mut HashMap<RegionId, Vec<Record>>,
) -> Result<Option<Entry>> {
let mut entry = None;
match record.meta.tp {
RecordType::Full => entry = Some(convert_to_naive_entry(provider.clone(), record)),
RecordType::First => {
let region_id = record.meta.ns.region_id.into();
if let Some(records) = buffered_records.insert(region_id, vec![record]) {
entry = Some(convert_to_multiple_entry(
provider.clone(),
region_id,
records,
))
}
}
RecordType::Middle(seq) => {
let region_id = record.meta.ns.region_id.into();
let records = buffered_records.entry(region_id).or_default();
if !records.is_empty() {
let last_record = records.last().unwrap();
let legal = match last_record.meta.tp {
RecordType::First => seq == 1,
RecordType::Middle(last_seq) => last_seq + 1 == seq,
_ => false,
};
ensure!(
legal,
IllegalSequenceSnafu {
error: format!(
"Illegal sequence of a middle record, last record: {:?}, incoming record: {:?}",
last_record.meta.tp,
record.meta.tp
)
}
);
}
records.push(record);
}
RecordType::Last => {
let region_id = record.meta.ns.region_id.into();
if let Some(mut records) = buffered_records.remove(®ion_id) {
records.push(record);
entry = Some(convert_to_multiple_entry(
provider.clone(),
region_id,
records,
))
} else {
entry = Some(convert_to_multiple_entry(
provider.clone(),
region_id,
vec![record],
))
}
}
}
Ok(entry)
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::sync::Arc;
use super::*;
use crate::error;
fn new_test_record(tp: RecordType, entry_id: EntryId, region_id: u64, data: Vec<u8>) -> Record {
Record {
meta: RecordMeta {
version: VERSION,
tp,
ns: NamespaceImpl {
region_id,
topic: "greptimedb_wal_topic".to_string(),
},
entry_id,
},
data,
}
}
#[test]
fn test_maybe_emit_entry_emit_naive_entry() {
let provider = Arc::new(KafkaProvider::new("my_topic".to_string()));
let region_id = RegionId::new(1, 1);
let mut buffer = HashMap::new();
let record = new_test_record(RecordType::Full, 1, region_id.as_u64(), vec![1; 100]);
let entry = maybe_emit_entry(&provider, record, &mut buffer)
.unwrap()
.unwrap();
assert_eq!(
entry,
Entry::Naive(NaiveEntry {
provider: Provider::Kafka(provider),
region_id,
entry_id: 1,
data: vec![1; 100]
})
);
}
#[test]
fn test_maybe_emit_entry_emit_incomplete_entry() {
let provider = Arc::new(KafkaProvider::new("my_topic".to_string()));
let region_id = RegionId::new(1, 1);
let mut buffer = HashMap::new();
let record = new_test_record(RecordType::First, 1, region_id.as_u64(), vec![1; 100]);
assert!(maybe_emit_entry(&provider, record, &mut buffer)
.unwrap()
.is_none());
let record = new_test_record(RecordType::First, 2, region_id.as_u64(), vec![2; 100]);
let incomplete_entry = maybe_emit_entry(&provider, record, &mut buffer)
.unwrap()
.unwrap();
assert_eq!(
incomplete_entry,
Entry::MultiplePart(MultiplePartEntry {
provider: Provider::Kafka(provider.clone()),
region_id,
entry_id: 0,
headers: vec![MultiplePartHeader::First],
parts: vec![vec![1; 100]],
})
);
let mut buffer = HashMap::new();
let record = new_test_record(RecordType::Last, 1, region_id.as_u64(), vec![1; 100]);
let incomplete_entry = maybe_emit_entry(&provider, record, &mut buffer)
.unwrap()
.unwrap();
assert_eq!(
incomplete_entry,
Entry::MultiplePart(MultiplePartEntry {
provider: Provider::Kafka(provider.clone()),
region_id,
entry_id: 0,
headers: vec![MultiplePartHeader::Last],
parts: vec![vec![1; 100]],
})
);
let mut buffer = HashMap::new();
let record = new_test_record(RecordType::Middle(0), 1, region_id.as_u64(), vec![1; 100]);
assert!(maybe_emit_entry(&provider, record, &mut buffer)
.unwrap()
.is_none());
let record = new_test_record(RecordType::First, 2, region_id.as_u64(), vec![2; 100]);
let incomplete_entry = maybe_emit_entry(&provider, record, &mut buffer)
.unwrap()
.unwrap();
assert_eq!(
incomplete_entry,
Entry::MultiplePart(MultiplePartEntry {
provider: Provider::Kafka(provider),
region_id,
entry_id: 0,
headers: vec![MultiplePartHeader::Middle(0)],
parts: vec![vec![1; 100]],
})
);
}
#[test]
fn test_maybe_emit_entry_illegal_seq() {
let provider = Arc::new(KafkaProvider::new("my_topic".to_string()));
let region_id = RegionId::new(1, 1);
let mut buffer = HashMap::new();
let record = new_test_record(RecordType::First, 1, region_id.as_u64(), vec![1; 100]);
assert!(maybe_emit_entry(&provider, record, &mut buffer)
.unwrap()
.is_none());
let record = new_test_record(RecordType::Middle(2), 1, region_id.as_u64(), vec![2; 100]);
let err = maybe_emit_entry(&provider, record, &mut buffer).unwrap_err();
assert_matches!(err, error::Error::IllegalSequence { .. });
let mut buffer = HashMap::new();
let record = new_test_record(RecordType::First, 1, region_id.as_u64(), vec![1; 100]);
assert!(maybe_emit_entry(&provider, record, &mut buffer)
.unwrap()
.is_none());
let record = new_test_record(RecordType::Middle(1), 1, region_id.as_u64(), vec![2; 100]);
assert!(maybe_emit_entry(&provider, record, &mut buffer)
.unwrap()
.is_none());
let record = new_test_record(RecordType::Middle(3), 1, region_id.as_u64(), vec![2; 100]);
let err = maybe_emit_entry(&provider, record, &mut buffer).unwrap_err();
assert_matches!(err, error::Error::IllegalSequence { .. });
}
#[test]
fn test_meta_size() {
let meta = RecordMeta {
version: VERSION,
tp: RecordType::Middle(usize::MAX),
entry_id: u64::MAX,
ns: NamespaceImpl {
region_id: RegionId::new(u32::MAX, u32::MAX).as_u64(),
topic: format!("greptime_kafka_cluster/1024/2048/{}", uuid::Uuid::new_v4()),
},
};
let serialized = serde_json::to_vec(&meta).unwrap();
assert!(serialized.len() < ESTIMATED_META_SIZE);
}
}