1use api::v1::{Mutation, Row, Rows, Value, WalEntry};
58use prost::Message;
59use prost::encoding::{WireType, encode_key, encode_varint, encoded_len_varint, key_len};
60
61const MUTATION_TAG: u32 = 1; const BULK_ENTRY_TAG: u32 = 2; const OP_TYPE_TAG: u32 = 1; const SEQUENCE_TAG: u32 = 2; const ROWS_TAG: u32 = 3; const WRITE_HINT_TAG: u32 = 4; const SCHEMA_TAG: u32 = 1; const ROW_TAG: u32 = 2; const VALUE_TAG: u32 = 1; #[inline]
75fn msg_field_len(tag: u32, body_len: usize) -> usize {
76 key_len(tag) + encoded_len_varint(body_len as u64) + body_len
77}
78
79#[derive(Default)]
82pub struct WalEntryEncoder {
83 sizes: Vec<usize>,
85}
86
87impl WalEntryEncoder {
88 pub fn new() -> Self {
89 Self::default()
90 }
91
92 pub fn encode_to_vec(&mut self, entry: &WalEntry) -> Vec<u8> {
95 self.sizes.clear();
96 let body_len = self.size_entry(entry);
97 let mut buf = Vec::with_capacity(body_len);
98 let mut cursor = 0;
99 self.encode_entry(entry, &mut buf, &mut cursor);
100 debug_assert_eq!(
104 cursor,
105 self.sizes.len(),
106 "cursor did not consume all cached sizes"
107 );
108 debug_assert_eq!(buf.len(), body_len, "encoded length mismatch");
109 buf
110 }
111
112 #[inline]
114 fn reserve(&mut self) -> usize {
115 let idx = self.sizes.len();
116 self.sizes.push(0);
117 idx
118 }
119
120 fn size_entry(&mut self, entry: &WalEntry) -> usize {
125 let WalEntry {
129 mutations,
130 bulk_entries,
131 } = entry;
132 let mut body = 0;
133 for m in mutations {
134 let mb = self.size_mutation(m);
135 body += msg_field_len(MUTATION_TAG, mb);
136 }
137 for be in bulk_entries {
138 let bl = be.encoded_len();
140 let slot = self.reserve();
141 self.sizes[slot] = bl;
142 body += msg_field_len(BULK_ENTRY_TAG, bl);
143 }
144 body
145 }
146
147 fn size_mutation(&mut self, m: &Mutation) -> usize {
148 let Mutation {
150 op_type,
151 sequence,
152 rows,
153 write_hint,
154 } = m;
155 let slot = self.reserve();
156 let mut body = 0;
157 if *op_type != 0 {
159 body += key_len(OP_TYPE_TAG) + encoded_len_varint(*op_type as u64);
160 }
161 if *sequence != 0 {
162 body += key_len(SEQUENCE_TAG) + encoded_len_varint(*sequence);
163 }
164 if let Some(rows) = rows {
165 let rb = self.size_rows(rows);
166 body += msg_field_len(ROWS_TAG, rb);
167 }
168 if let Some(hint) = write_hint {
169 let hl = hint.encoded_len();
170 let s = self.reserve();
171 self.sizes[s] = hl;
172 body += msg_field_len(WRITE_HINT_TAG, hl);
173 }
174 self.sizes[slot] = body;
175 body
176 }
177
178 fn size_rows(&mut self, rows: &Rows) -> usize {
179 let Rows { schema, rows } = rows;
181 let slot = self.reserve();
182 let mut body = 0;
183 for col in schema {
184 let cl = col.encoded_len();
185 let s = self.reserve();
186 self.sizes[s] = cl;
187 body += msg_field_len(SCHEMA_TAG, cl);
188 }
189 for row in rows {
190 let rb = self.size_row(row);
191 body += msg_field_len(ROW_TAG, rb);
192 }
193 self.sizes[slot] = body;
194 body
195 }
196
197 fn size_row(&mut self, row: &Row) -> usize {
198 let Row { values } = row;
200 let slot = self.reserve();
201 let mut body = 0;
202 for value in values {
203 let vl = self.size_value(value);
204 body += msg_field_len(VALUE_TAG, vl);
205 }
206 self.sizes[slot] = body;
207 body
208 }
209
210 #[inline]
211 fn size_value(&mut self, value: &Value) -> usize {
212 let vl = value.encoded_len();
216 let slot = self.reserve();
217 self.sizes[slot] = vl;
218 vl
219 }
220
221 #[inline]
224 fn next_size(&self, cursor: &mut usize) -> usize {
225 let v = *self
226 .sizes
227 .get(*cursor)
228 .expect("size pass and encode pass disagree: cursor out of bounds (encoder bug)");
229 *cursor += 1;
230 v
231 }
232
233 fn encode_entry(&self, entry: &WalEntry, buf: &mut Vec<u8>, cursor: &mut usize) {
238 let WalEntry {
240 mutations,
241 bulk_entries,
242 } = entry;
243 for m in mutations {
244 let mb = self.next_size(cursor);
245 encode_key(MUTATION_TAG, WireType::LengthDelimited, buf);
246 encode_varint(mb as u64, buf);
247 self.encode_mutation_body(m, buf, cursor);
248 }
249 for be in bulk_entries {
250 let bl = self.next_size(cursor);
251 encode_key(BULK_ENTRY_TAG, WireType::LengthDelimited, buf);
252 encode_varint(bl as u64, buf);
253 be.encode_raw(buf);
254 }
255 }
256
257 fn encode_mutation_body(&self, m: &Mutation, buf: &mut Vec<u8>, cursor: &mut usize) {
258 let Mutation {
260 op_type,
261 sequence,
262 rows,
263 write_hint,
264 } = m;
265 if *op_type != 0 {
266 encode_key(OP_TYPE_TAG, WireType::Varint, buf);
267 encode_varint(*op_type as u64, buf);
268 }
269 if *sequence != 0 {
270 encode_key(SEQUENCE_TAG, WireType::Varint, buf);
271 encode_varint(*sequence, buf);
272 }
273 if let Some(rows) = rows {
274 let rb = self.next_size(cursor);
275 encode_key(ROWS_TAG, WireType::LengthDelimited, buf);
276 encode_varint(rb as u64, buf);
277 self.encode_rows_body(rows, buf, cursor);
278 }
279 if let Some(hint) = write_hint {
280 let hl = self.next_size(cursor);
281 encode_key(WRITE_HINT_TAG, WireType::LengthDelimited, buf);
282 encode_varint(hl as u64, buf);
283 hint.encode_raw(buf);
284 }
285 }
286
287 fn encode_rows_body(&self, rows: &Rows, buf: &mut Vec<u8>, cursor: &mut usize) {
288 let Rows { schema, rows } = rows;
290 for col in schema {
291 let cl = self.next_size(cursor);
292 encode_key(SCHEMA_TAG, WireType::LengthDelimited, buf);
293 encode_varint(cl as u64, buf);
294 col.encode_raw(buf);
295 }
296 for row in rows {
297 let rb = self.next_size(cursor);
298 encode_key(ROW_TAG, WireType::LengthDelimited, buf);
299 encode_varint(rb as u64, buf);
300 self.encode_row_body(row, buf, cursor);
301 }
302 }
303
304 fn encode_row_body(&self, row: &Row, buf: &mut Vec<u8>, cursor: &mut usize) {
305 let Row { values } = row;
307 for value in values {
308 let vl = self.next_size(cursor);
309 encode_key(VALUE_TAG, WireType::LengthDelimited, buf);
310 encode_varint(vl as u64, buf);
311 value.encode_raw(buf);
312 }
313 }
314}
315
316#[cfg(test)]
317mod tests {
318 use api::v1::value::ValueData;
319 use api::v1::{
320 BulkWalEntry, ColumnDataType, ColumnSchema, Mutation, OpType, Row, Rows, SemanticType,
321 Value, WalEntry, WriteHint,
322 };
323 use prost::Message;
324
325 use super::*;
326
327 fn col(name: &str, dt: ColumnDataType, st: SemanticType) -> ColumnSchema {
328 ColumnSchema {
329 column_name: name.to_string(),
330 datatype: dt as i32,
331 semantic_type: st as i32,
332 datatype_extension: None,
333 options: None,
334 }
335 }
336
337 fn sample_rows(num_rows: usize, with_null: bool) -> Rows {
338 let schema = vec![
339 col("host", ColumnDataType::String, SemanticType::Tag),
340 col(
341 "ts",
342 ColumnDataType::TimestampMillisecond,
343 SemanticType::Timestamp,
344 ),
345 col("v", ColumnDataType::Float64, SemanticType::Field),
346 ];
347 let rows = (0..num_rows)
348 .map(|i| Row {
349 values: vec![
350 Value {
351 value_data: Some(ValueData::StringValue(format!("h{i}"))),
352 },
353 Value {
354 value_data: Some(ValueData::TimestampMillisecondValue(i as i64)),
355 },
356 Value {
358 value_data: if with_null && i % 2 == 0 {
359 None
360 } else {
361 Some(ValueData::F64Value(i as f64))
362 },
363 },
364 ],
365 })
366 .collect();
367 Rows { schema, rows }
368 }
369
370 fn assert_byte_identical(entry: &WalEntry) {
371 let expected = entry.encode_to_vec();
372 let actual = WalEntryEncoder::new().encode_to_vec(entry);
373 assert_eq!(
374 expected,
375 actual,
376 "fast encoder output differs from prost (len {} vs {})",
377 expected.len(),
378 actual.len()
379 );
380 }
381
382 #[test]
383 fn test_matches_prost_basic() {
384 let entry = WalEntry {
385 mutations: vec![Mutation {
386 op_type: OpType::Put as i32,
387 sequence: 42,
388 rows: Some(sample_rows(5, false)),
389 write_hint: None,
390 }],
391 bulk_entries: vec![],
392 };
393 assert_byte_identical(&entry);
394 }
395
396 #[test]
397 fn test_matches_prost_multi_mutation_with_nulls_and_delete() {
398 let entry = WalEntry {
399 mutations: vec![
400 Mutation {
401 op_type: OpType::Put as i32,
402 sequence: 1,
403 rows: Some(sample_rows(3, true)),
404 write_hint: Some(WriteHint {
405 primary_key_encoding: 1,
406 }),
407 },
408 Mutation {
410 op_type: OpType::Delete as i32,
411 sequence: 4,
412 rows: Some(sample_rows(2, true)),
413 write_hint: None,
414 },
415 ],
416 bulk_entries: vec![],
417 };
418 assert_byte_identical(&entry);
419 }
420
421 #[test]
422 fn test_matches_prost_empty_and_edge() {
423 assert_byte_identical(&WalEntry::default());
425 let entry = WalEntry {
427 mutations: vec![Mutation {
428 op_type: OpType::Put as i32,
429 sequence: 0,
430 rows: Some(Rows {
431 schema: vec![],
432 rows: vec![],
433 }),
434 write_hint: None,
435 }],
436 bulk_entries: vec![],
437 };
438 assert_byte_identical(&entry);
439 }
440
441 #[test]
442 fn test_reused_encoder_matches() {
443 let mut enc = WalEntryEncoder::new();
444 for n in [1usize, 10, 100] {
445 let entry = WalEntry {
446 mutations: vec![Mutation {
447 op_type: OpType::Put as i32,
448 sequence: n as u64,
449 rows: Some(sample_rows(n, true)),
450 write_hint: None,
451 }],
452 bulk_entries: vec![],
453 };
454 assert_eq!(entry.encode_to_vec(), enc.encode_to_vec(&entry));
455 }
456 }
457
458 #[test]
461 fn test_matches_prost_with_bulk_entries() {
462 let entry = WalEntry {
463 mutations: vec![Mutation {
464 op_type: OpType::Put as i32,
465 sequence: 7,
466 rows: Some(sample_rows(2, false)),
467 write_hint: None,
468 }],
469 bulk_entries: vec![
470 BulkWalEntry {
471 sequence: 100,
472 max_ts: 200,
473 min_ts: 50,
474 timestamp_index: 3,
475 body: None,
476 },
477 BulkWalEntry {
478 sequence: 101,
479 max_ts: 0, min_ts: 0, timestamp_index: 0,
482 body: None,
483 },
484 ],
485 };
486 assert_byte_identical(&entry);
487 }
488
489 #[test]
494 fn test_matches_prost_all_fields_populated() {
495 let entry = WalEntry {
496 mutations: vec![Mutation {
497 op_type: OpType::Put as i32,
498 sequence: 12345,
499 rows: Some(sample_rows(4, true)),
500 write_hint: Some(WriteHint {
501 primary_key_encoding: 1,
502 }),
503 }],
504 bulk_entries: vec![BulkWalEntry {
505 sequence: 999,
506 max_ts: 1_700_000_000_000,
507 min_ts: 1_699_000_000_000,
508 timestamp_index: 2,
509 body: None,
510 }],
511 };
512 assert_byte_identical(&entry);
513 }
514}