Skip to main content

mito2/wal/
encoder.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A cached-size, single-pass encoder for [`WalEntry`].
16//!
17//! # Why
18//!
19//! `prost` does not cache message sizes the way protobuf-C++ does. When
20//! `WalEntry::encode_to_vec` runs, the encode pass recomputes the
21//! `encoded_len` of every nested message each time it needs a length
22//! delimiter. For the deep WAL tree
23//! (`WalEntry → Mutation → Rows → Row → Value`) the length of a leaf `Value`
24//! ends up being recomputed roughly once per ancestor level — i.e. ~5 times.
25//! Microbenchmarks show ~87% of `encode_to_vec` is spent in these repeated
26//! length walks, not in writing bytes.
27//!
28//! # How
29//!
30//! This encoder walks the tree once to compute and cache the body length of
31//! every length-delimited message node (in pre-order into a flat `sizes`
32//! vector), then walks it a second time to write bytes, reading each cached
33//! length back via a cursor. Every node's length is computed exactly once.
34//!
35//! Leaf messages whose sizing is not recursively redundant (`ColumnSchema`,
36//! `Value`, `WriteHint`, `BulkWalEntry`) are delegated to prost's own
37//! `encoded_len`/`encode_raw`, but their (single) computed length is still
38//! cached so the encode pass never recomputes it.
39//!
40//! The output is byte-for-byte identical to `WalEntry::encode_to_vec`; this is
41//! asserted in tests and must hold to preserve WAL replay compatibility.
42//!
43//! # Maintenance
44//!
45//! This encoder hard-codes the wire layout (field tags and field order) of
46//! `WalEntry`, `Mutation`, `Rows` and `Row`. If any of these messages change in
47//! greptime-proto, this file MUST be updated to match:
48//! - **Adding or removing a field** is caught at compile time: every one of
49//!   these messages is destructured exhaustively (no `..`), so a changed field
50//!   set fails to compile here.
51//! - **Changing a field's tag number or type** is caught by the byte-for-byte
52//!   equality tests against prost (which populate all fields). Keep those tests
53//!   exhaustive when adding fields.
54//!
55//! Leaf messages are delegated to prost, so changes to them need no update here.
56
57use api::v1::{Mutation, Row, Rows, Value, WalEntry};
58use prost::Message;
59use prost::encoding::{WireType, encode_key, encode_varint, encoded_len_varint, key_len};
60
61// Field tags (must match greptime-proto definitions exactly).
62const MUTATION_TAG: u32 = 1; // WalEntry.mutations
63const BULK_ENTRY_TAG: u32 = 2; // WalEntry.bulk_entries
64const OP_TYPE_TAG: u32 = 1; // Mutation.op_type
65const SEQUENCE_TAG: u32 = 2; // Mutation.sequence
66const ROWS_TAG: u32 = 3; // Mutation.rows
67const WRITE_HINT_TAG: u32 = 4; // Mutation.write_hint
68const SCHEMA_TAG: u32 = 1; // Rows.schema
69const ROW_TAG: u32 = 2; // Rows.rows
70const VALUE_TAG: u32 = 1; // Row.values
71
72/// Length contribution of a length-delimited message field:
73/// key + length varint + body.
74#[inline]
75fn msg_field_len(tag: u32, body_len: usize) -> usize {
76    key_len(tag) + encoded_len_varint(body_len as u64) + body_len
77}
78
79/// A reusable encoder that caches message body sizes between its size pass and
80/// its encode pass.
81#[derive(Default)]
82pub struct WalEntryEncoder {
83    /// Cached body lengths of message nodes, in pre-order.
84    sizes: Vec<usize>,
85}
86
87impl WalEntryEncoder {
88    pub fn new() -> Self {
89        Self::default()
90    }
91
92    /// Encodes `entry` to a new `Vec<u8>`, byte-for-byte identical to
93    /// `entry.encode_to_vec()`.
94    pub fn encode_to_vec(&mut self, entry: &WalEntry) -> Vec<u8> {
95        self.sizes.clear();
96        let body_len = self.size_entry(entry);
97        let mut buf = Vec::with_capacity(body_len);
98        let mut cursor = 0;
99        self.encode_entry(entry, &mut buf, &mut cursor);
100        // Invariants of the two-pass design. Kept as `debug_assert` to avoid any
101        // overhead on the hot write path; correctness is covered by the
102        // byte-for-byte equality tests against prost.
103        debug_assert_eq!(
104            cursor,
105            self.sizes.len(),
106            "cursor did not consume all cached sizes"
107        );
108        debug_assert_eq!(buf.len(), body_len, "encoded length mismatch");
109        buf
110    }
111
112    /// Reserves a size slot in pre-order and returns its index.
113    #[inline]
114    fn reserve(&mut self) -> usize {
115        let idx = self.sizes.len();
116        self.sizes.push(0);
117        idx
118    }
119
120    // ---- size pass ----------------------------------------------------------
121
122    /// Returns the body length of the `WalEntry` (no length delimiter; it is
123    /// the root). Pushes cached slots for all nested message nodes.
124    fn size_entry(&mut self, entry: &WalEntry) -> usize {
125        // Exhaustive destructure (no `..`): adding a field to `WalEntry` in
126        // greptime-proto makes this fail to compile, forcing this encoder to be
127        // updated rather than silently dropping the new field from the WAL.
128        let WalEntry {
129            mutations,
130            bulk_entries,
131        } = entry;
132        let mut body = 0;
133        for m in mutations {
134            let mb = self.size_mutation(m);
135            body += msg_field_len(MUTATION_TAG, mb);
136        }
137        for be in bulk_entries {
138            // Leaf: delegate sizing to prost, cache the result.
139            let bl = be.encoded_len();
140            let slot = self.reserve();
141            self.sizes[slot] = bl;
142            body += msg_field_len(BULK_ENTRY_TAG, bl);
143        }
144        body
145    }
146
147    fn size_mutation(&mut self, m: &Mutation) -> usize {
148        // Exhaustive destructure: see note in `size_entry`.
149        let Mutation {
150            op_type,
151            sequence,
152            rows,
153            write_hint,
154        } = m;
155        let slot = self.reserve();
156        let mut body = 0;
157        // proto3 scalars are only encoded when non-default.
158        if *op_type != 0 {
159            body += key_len(OP_TYPE_TAG) + encoded_len_varint(*op_type as u64);
160        }
161        if *sequence != 0 {
162            body += key_len(SEQUENCE_TAG) + encoded_len_varint(*sequence);
163        }
164        if let Some(rows) = rows {
165            let rb = self.size_rows(rows);
166            body += msg_field_len(ROWS_TAG, rb);
167        }
168        if let Some(hint) = write_hint {
169            let hl = hint.encoded_len();
170            let s = self.reserve();
171            self.sizes[s] = hl;
172            body += msg_field_len(WRITE_HINT_TAG, hl);
173        }
174        self.sizes[slot] = body;
175        body
176    }
177
178    fn size_rows(&mut self, rows: &Rows) -> usize {
179        // Exhaustive destructure: see note in `size_entry`.
180        let Rows { schema, rows } = rows;
181        let slot = self.reserve();
182        let mut body = 0;
183        for col in schema {
184            let cl = col.encoded_len();
185            let s = self.reserve();
186            self.sizes[s] = cl;
187            body += msg_field_len(SCHEMA_TAG, cl);
188        }
189        for row in rows {
190            let rb = self.size_row(row);
191            body += msg_field_len(ROW_TAG, rb);
192        }
193        self.sizes[slot] = body;
194        body
195    }
196
197    fn size_row(&mut self, row: &Row) -> usize {
198        // Exhaustive destructure: see note in `size_entry`.
199        let Row { values } = row;
200        let slot = self.reserve();
201        let mut body = 0;
202        for value in values {
203            let vl = self.size_value(value);
204            body += msg_field_len(VALUE_TAG, vl);
205        }
206        self.sizes[slot] = body;
207        body
208    }
209
210    #[inline]
211    fn size_value(&mut self, value: &Value) -> usize {
212        // Value is a leaf message (a single oneof); its `encoded_len` is not
213        // recursively redundant, but we cache it so the encode pass does not
214        // recompute it.
215        let vl = value.encoded_len();
216        let slot = self.reserve();
217        self.sizes[slot] = vl;
218        vl
219    }
220
221    // ---- encode pass --------------------------------------------------------
222
223    #[inline]
224    fn next_size(&self, cursor: &mut usize) -> usize {
225        let v = *self
226            .sizes
227            .get(*cursor)
228            .expect("size pass and encode pass disagree: cursor out of bounds (encoder bug)");
229        *cursor += 1;
230        v
231    }
232
233    // Convention: the caller reads a child message's cached body length (via
234    // `next_size`, consuming that child's slot in pre-order) and writes the
235    // key + length delimiter; the callee then writes only the body.
236
237    fn encode_entry(&self, entry: &WalEntry, buf: &mut Vec<u8>, cursor: &mut usize) {
238        // Exhaustive destructure: see note in `size_entry`.
239        let WalEntry {
240            mutations,
241            bulk_entries,
242        } = entry;
243        for m in mutations {
244            let mb = self.next_size(cursor);
245            encode_key(MUTATION_TAG, WireType::LengthDelimited, buf);
246            encode_varint(mb as u64, buf);
247            self.encode_mutation_body(m, buf, cursor);
248        }
249        for be in bulk_entries {
250            let bl = self.next_size(cursor);
251            encode_key(BULK_ENTRY_TAG, WireType::LengthDelimited, buf);
252            encode_varint(bl as u64, buf);
253            be.encode_raw(buf);
254        }
255    }
256
257    fn encode_mutation_body(&self, m: &Mutation, buf: &mut Vec<u8>, cursor: &mut usize) {
258        // Exhaustive destructure: see note in `size_entry`.
259        let Mutation {
260            op_type,
261            sequence,
262            rows,
263            write_hint,
264        } = m;
265        if *op_type != 0 {
266            encode_key(OP_TYPE_TAG, WireType::Varint, buf);
267            encode_varint(*op_type as u64, buf);
268        }
269        if *sequence != 0 {
270            encode_key(SEQUENCE_TAG, WireType::Varint, buf);
271            encode_varint(*sequence, buf);
272        }
273        if let Some(rows) = rows {
274            let rb = self.next_size(cursor);
275            encode_key(ROWS_TAG, WireType::LengthDelimited, buf);
276            encode_varint(rb as u64, buf);
277            self.encode_rows_body(rows, buf, cursor);
278        }
279        if let Some(hint) = write_hint {
280            let hl = self.next_size(cursor);
281            encode_key(WRITE_HINT_TAG, WireType::LengthDelimited, buf);
282            encode_varint(hl as u64, buf);
283            hint.encode_raw(buf);
284        }
285    }
286
287    fn encode_rows_body(&self, rows: &Rows, buf: &mut Vec<u8>, cursor: &mut usize) {
288        // Exhaustive destructure: see note in `size_entry`.
289        let Rows { schema, rows } = rows;
290        for col in schema {
291            let cl = self.next_size(cursor);
292            encode_key(SCHEMA_TAG, WireType::LengthDelimited, buf);
293            encode_varint(cl as u64, buf);
294            col.encode_raw(buf);
295        }
296        for row in rows {
297            let rb = self.next_size(cursor);
298            encode_key(ROW_TAG, WireType::LengthDelimited, buf);
299            encode_varint(rb as u64, buf);
300            self.encode_row_body(row, buf, cursor);
301        }
302    }
303
304    fn encode_row_body(&self, row: &Row, buf: &mut Vec<u8>, cursor: &mut usize) {
305        // Exhaustive destructure: see note in `size_entry`.
306        let Row { values } = row;
307        for value in values {
308            let vl = self.next_size(cursor);
309            encode_key(VALUE_TAG, WireType::LengthDelimited, buf);
310            encode_varint(vl as u64, buf);
311            value.encode_raw(buf);
312        }
313    }
314}
315
316#[cfg(test)]
317mod tests {
318    use api::v1::value::ValueData;
319    use api::v1::{
320        BulkWalEntry, ColumnDataType, ColumnSchema, Mutation, OpType, Row, Rows, SemanticType,
321        Value, WalEntry, WriteHint,
322    };
323    use prost::Message;
324
325    use super::*;
326
327    fn col(name: &str, dt: ColumnDataType, st: SemanticType) -> ColumnSchema {
328        ColumnSchema {
329            column_name: name.to_string(),
330            datatype: dt as i32,
331            semantic_type: st as i32,
332            datatype_extension: None,
333            options: None,
334        }
335    }
336
337    fn sample_rows(num_rows: usize, with_null: bool) -> Rows {
338        let schema = vec![
339            col("host", ColumnDataType::String, SemanticType::Tag),
340            col(
341                "ts",
342                ColumnDataType::TimestampMillisecond,
343                SemanticType::Timestamp,
344            ),
345            col("v", ColumnDataType::Float64, SemanticType::Field),
346        ];
347        let rows = (0..num_rows)
348            .map(|i| Row {
349                values: vec![
350                    Value {
351                        value_data: Some(ValueData::StringValue(format!("h{i}"))),
352                    },
353                    Value {
354                        value_data: Some(ValueData::TimestampMillisecondValue(i as i64)),
355                    },
356                    // Exercise the null (None oneof) path.
357                    Value {
358                        value_data: if with_null && i % 2 == 0 {
359                            None
360                        } else {
361                            Some(ValueData::F64Value(i as f64))
362                        },
363                    },
364                ],
365            })
366            .collect();
367        Rows { schema, rows }
368    }
369
370    fn assert_byte_identical(entry: &WalEntry) {
371        let expected = entry.encode_to_vec();
372        let actual = WalEntryEncoder::new().encode_to_vec(entry);
373        assert_eq!(
374            expected,
375            actual,
376            "fast encoder output differs from prost (len {} vs {})",
377            expected.len(),
378            actual.len()
379        );
380    }
381
382    #[test]
383    fn test_matches_prost_basic() {
384        let entry = WalEntry {
385            mutations: vec![Mutation {
386                op_type: OpType::Put as i32,
387                sequence: 42,
388                rows: Some(sample_rows(5, false)),
389                write_hint: None,
390            }],
391            bulk_entries: vec![],
392        };
393        assert_byte_identical(&entry);
394    }
395
396    #[test]
397    fn test_matches_prost_multi_mutation_with_nulls_and_delete() {
398        let entry = WalEntry {
399            mutations: vec![
400                Mutation {
401                    op_type: OpType::Put as i32,
402                    sequence: 1,
403                    rows: Some(sample_rows(3, true)),
404                    write_hint: Some(WriteHint {
405                        primary_key_encoding: 1,
406                    }),
407                },
408                // op_type Delete == 0: must be skipped as proto3 default.
409                Mutation {
410                    op_type: OpType::Delete as i32,
411                    sequence: 4,
412                    rows: Some(sample_rows(2, true)),
413                    write_hint: None,
414                },
415            ],
416            bulk_entries: vec![],
417        };
418        assert_byte_identical(&entry);
419    }
420
421    #[test]
422    fn test_matches_prost_empty_and_edge() {
423        // Empty entry.
424        assert_byte_identical(&WalEntry::default());
425        // Mutation with sequence 0 (default, skipped) and empty rows.
426        let entry = WalEntry {
427            mutations: vec![Mutation {
428                op_type: OpType::Put as i32,
429                sequence: 0,
430                rows: Some(Rows {
431                    schema: vec![],
432                    rows: vec![],
433                }),
434                write_hint: None,
435            }],
436            bulk_entries: vec![],
437        };
438        assert_byte_identical(&entry);
439    }
440
441    #[test]
442    fn test_reused_encoder_matches() {
443        let mut enc = WalEntryEncoder::new();
444        for n in [1usize, 10, 100] {
445            let entry = WalEntry {
446                mutations: vec![Mutation {
447                    op_type: OpType::Put as i32,
448                    sequence: n as u64,
449                    rows: Some(sample_rows(n, true)),
450                    write_hint: None,
451                }],
452                bulk_entries: vec![],
453            };
454            assert_eq!(entry.encode_to_vec(), enc.encode_to_vec(&entry));
455        }
456    }
457
458    /// Exercises the `bulk_entries` path (the other branch of `encode_entry`),
459    /// which the other tests leave empty.
460    #[test]
461    fn test_matches_prost_with_bulk_entries() {
462        let entry = WalEntry {
463            mutations: vec![Mutation {
464                op_type: OpType::Put as i32,
465                sequence: 7,
466                rows: Some(sample_rows(2, false)),
467                write_hint: None,
468            }],
469            bulk_entries: vec![
470                BulkWalEntry {
471                    sequence: 100,
472                    max_ts: 200,
473                    min_ts: 50,
474                    timestamp_index: 3,
475                    body: None,
476                },
477                BulkWalEntry {
478                    sequence: 101,
479                    max_ts: 0, // default, must be skipped
480                    min_ts: 0, // default, must be skipped
481                    timestamp_index: 0,
482                    body: None,
483                },
484            ],
485        };
486        assert_byte_identical(&entry);
487    }
488
489    /// Populates every field of the hand-rolled messages (mutations + bulk
490    /// entries, rows with schema and rows, write_hint set) so the byte-equality
491    /// check guards against tag/type changes, not just field additions (which
492    /// the exhaustive destructure already catches at compile time).
493    #[test]
494    fn test_matches_prost_all_fields_populated() {
495        let entry = WalEntry {
496            mutations: vec![Mutation {
497                op_type: OpType::Put as i32,
498                sequence: 12345,
499                rows: Some(sample_rows(4, true)),
500                write_hint: Some(WriteHint {
501                    primary_key_encoding: 1,
502                }),
503            }],
504            bulk_entries: vec![BulkWalEntry {
505                sequence: 999,
506                max_ts: 1_700_000_000_000,
507                min_ts: 1_699_000_000_000,
508                timestamp_index: 2,
509                body: None,
510            }],
511        };
512        assert_byte_identical(&entry);
513    }
514}