mito2/
region_write_ctx.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::mem;
16use std::sync::Arc;
17use std::sync::atomic::{AtomicU64, Ordering};
18
19use api::v1::{BulkWalEntry, Mutation, OpType, Rows, WalEntry, WriteHint};
20use futures::stream::{FuturesUnordered, StreamExt};
21use snafu::ResultExt;
22use store_api::logstore::LogStore;
23use store_api::logstore::provider::Provider;
24use store_api::storage::{RegionId, SequenceNumber};
25
26use crate::error::{Error, Result, WriteGroupSnafu};
27use crate::memtable::KeyValues;
28use crate::memtable::bulk::part::BulkPart;
29use crate::metrics;
30use crate::region::version::{VersionControlData, VersionControlRef, VersionRef};
31use crate::request::OptionOutputTx;
32use crate::wal::{EntryId, WalWriter};
33
34/// Notifier to notify write result on drop.
35struct WriteNotify {
36    /// Error to send to the waiter.
37    err: Option<Arc<Error>>,
38    /// Sender to send write result to the waiter for this mutation.
39    sender: OptionOutputTx,
40    /// Number of rows to be written.
41    num_rows: usize,
42}
43
44impl WriteNotify {
45    /// Creates a new notify from the `sender`.
46    fn new(sender: OptionOutputTx, num_rows: usize) -> WriteNotify {
47        WriteNotify {
48            err: None,
49            sender,
50            num_rows,
51        }
52    }
53
54    /// Send result to the waiter.
55    fn notify_result(&mut self) {
56        if let Some(err) = &self.err {
57            // Try to send the error to waiters.
58            self.sender
59                .send_mut(Err(err.clone()).context(WriteGroupSnafu));
60        } else {
61            // Send success result.
62            self.sender.send_mut(Ok(self.num_rows));
63        }
64    }
65}
66
67impl Drop for WriteNotify {
68    fn drop(&mut self) {
69        self.notify_result();
70    }
71}
72
73/// Context to keep region metadata and buffer write requests.
74pub(crate) struct RegionWriteCtx {
75    /// Id of region to write.
76    region_id: RegionId,
77    /// Version of the region while creating the context.
78    version: VersionRef,
79    /// VersionControl of the region.
80    version_control: VersionControlRef,
81    /// Next sequence number to write.
82    ///
83    /// The context assigns a unique sequence number for each row.
84    next_sequence: SequenceNumber,
85    /// Next entry id of WAL to write.
86    next_entry_id: EntryId,
87    /// Valid WAL entry to write.
88    ///
89    /// We keep [WalEntry] instead of mutations to avoid taking mutations
90    /// out of the context to construct the wal entry when we write to the wal.
91    wal_entry: WalEntry,
92    /// Wal options of the region being written to.
93    provider: Provider,
94    /// Notifiers to send write results to waiters.
95    ///
96    /// The i-th notify is for i-th mutation.
97    notifiers: Vec<WriteNotify>,
98    /// Notifiers for bulk requests.
99    bulk_notifiers: Vec<WriteNotify>,
100    /// Pending bulk write requests
101    pub(crate) bulk_parts: Vec<BulkPart>,
102    /// The write operation is failed and we should not write to the mutable memtable.
103    failed: bool,
104
105    // Metrics:
106    /// Rows to put.
107    pub(crate) put_num: usize,
108    /// Rows to delete.
109    pub(crate) delete_num: usize,
110    /// The total bytes written to the region.
111    pub(crate) written_bytes: Option<Arc<AtomicU64>>,
112}
113
114impl RegionWriteCtx {
115    /// Returns an empty context.
116    pub(crate) fn new(
117        region_id: RegionId,
118        version_control: &VersionControlRef,
119        provider: Provider,
120        written_bytes: Option<Arc<AtomicU64>>,
121    ) -> RegionWriteCtx {
122        let VersionControlData {
123            version,
124            committed_sequence,
125            last_entry_id,
126            ..
127        } = version_control.current();
128
129        RegionWriteCtx {
130            region_id,
131            version,
132            version_control: version_control.clone(),
133            next_sequence: committed_sequence + 1,
134            next_entry_id: last_entry_id + 1,
135            wal_entry: WalEntry::default(),
136            provider,
137            notifiers: Vec::new(),
138            bulk_notifiers: vec![],
139            failed: false,
140            put_num: 0,
141            delete_num: 0,
142            bulk_parts: vec![],
143            written_bytes,
144        }
145    }
146
147    /// Push mutation to the context.
148    /// This method adopts the sequence number in parameters if present.
149    pub(crate) fn push_mutation(
150        &mut self,
151        op_type: i32,
152        rows: Option<Rows>,
153        write_hint: Option<WriteHint>,
154        tx: OptionOutputTx,
155        sequence: Option<SequenceNumber>,
156    ) {
157        if let Some(sequence) = sequence {
158            self.next_sequence = sequence;
159        }
160        let num_rows = rows.as_ref().map(|rows| rows.rows.len()).unwrap_or(0);
161        self.wal_entry.mutations.push(Mutation {
162            op_type,
163            sequence: self.next_sequence,
164            rows,
165            write_hint,
166        });
167
168        let notify = WriteNotify::new(tx, num_rows);
169        // Notifiers are 1:1 map to mutations.
170        self.notifiers.push(notify);
171
172        // Increase sequence number.
173        self.next_sequence += num_rows as u64;
174
175        // Update metrics.
176        match OpType::try_from(op_type) {
177            Ok(OpType::Delete) => self.delete_num += num_rows,
178            Ok(OpType::Put) => self.put_num += num_rows,
179            Err(_) => (),
180        }
181    }
182
183    /// Encode and add WAL entry to the writer.
184    pub(crate) fn add_wal_entry<S: LogStore>(
185        &mut self,
186        wal_writer: &mut WalWriter<S>,
187    ) -> Result<()> {
188        wal_writer.add_entry(
189            self.region_id,
190            self.next_entry_id,
191            &self.wal_entry,
192            &self.provider,
193        )?;
194        self.next_entry_id += 1;
195        Ok(())
196    }
197
198    pub(crate) fn version(&self) -> &VersionRef {
199        &self.version
200    }
201
202    /// Sets error and marks all write operations are failed.
203    pub(crate) fn set_error(&mut self, err: Arc<Error>) {
204        // Set error for all notifiers
205        for notify in &mut self.notifiers {
206            notify.err = Some(err.clone());
207        }
208
209        // Fail the whole write operation.
210        self.failed = true;
211    }
212
213    /// Updates next entry id.
214    pub(crate) fn set_next_entry_id(&mut self, next_entry_id: EntryId) {
215        self.next_entry_id = next_entry_id
216    }
217
218    /// Consumes mutations and writes them into mutable memtable.
219    pub(crate) async fn write_memtable(&mut self) {
220        debug_assert_eq!(self.notifiers.len(), self.wal_entry.mutations.len());
221
222        if self.failed {
223            return;
224        }
225
226        let mutable_memtable = self.version.memtables.mutable.clone();
227        let prev_memory_usage = if self.written_bytes.is_some() {
228            Some(mutable_memtable.memory_usage())
229        } else {
230            None
231        };
232
233        let mutations = mem::take(&mut self.wal_entry.mutations)
234            .into_iter()
235            .enumerate()
236            .filter_map(|(i, mutation)| {
237                let kvs = KeyValues::new(&self.version.metadata, mutation)?;
238                Some((i, kvs))
239            })
240            .collect::<Vec<_>>();
241
242        if mutations.len() == 1 {
243            if let Err(err) = mutable_memtable.write(&mutations[0].1) {
244                self.notifiers[mutations[0].0].err = Some(Arc::new(err));
245            }
246        } else {
247            let mut tasks = FuturesUnordered::new();
248            for (i, kvs) in mutations {
249                let mutable = mutable_memtable.clone();
250                // use tokio runtime to schedule tasks.
251                tasks.push(common_runtime::spawn_blocking_global(move || {
252                    (i, mutable.write(&kvs))
253                }));
254            }
255
256            while let Some(result) = tasks.next().await {
257                // first unwrap the result from `spawn` above
258                let (i, result) = result.unwrap();
259                if let Err(err) = result {
260                    self.notifiers[i].err = Some(Arc::new(err));
261                }
262            }
263        }
264
265        if let Some(written_bytes) = &self.written_bytes {
266            let new_memory_usage = mutable_memtable.memory_usage();
267            let bytes = new_memory_usage.saturating_sub(prev_memory_usage.unwrap_or_default());
268            written_bytes.fetch_add(bytes as u64, Ordering::Relaxed);
269        }
270        // Updates region sequence and entry id. Since we stores last sequence and entry id in region, we need
271        // to decrease `next_sequence` and `next_entry_id` by 1.
272        self.version_control
273            .set_sequence_and_entry_id(self.next_sequence - 1, self.next_entry_id - 1);
274    }
275
276    pub(crate) fn push_bulk(
277        &mut self,
278        sender: OptionOutputTx,
279        mut bulk: BulkPart,
280        sequence: Option<SequenceNumber>,
281    ) -> bool {
282        if let Some(sequence) = sequence {
283            self.next_sequence = sequence;
284        }
285        bulk.sequence = self.next_sequence;
286        let entry = match BulkWalEntry::try_from(&bulk) {
287            Ok(entry) => entry,
288            Err(e) => {
289                sender.send(Err(e));
290                return false;
291            }
292        };
293
294        self.bulk_notifiers
295            .push(WriteNotify::new(sender, bulk.num_rows()));
296
297        // Add bulk wal entry
298        self.wal_entry.bulk_entries.push(entry);
299        self.next_sequence += bulk.num_rows() as u64;
300        self.bulk_parts.push(bulk);
301        true
302    }
303
304    pub(crate) async fn write_bulk(&mut self) {
305        if self.failed || self.bulk_parts.is_empty() {
306            return;
307        }
308        let _timer = metrics::REGION_WORKER_HANDLE_WRITE_ELAPSED
309            .with_label_values(&["write_bulk"])
310            .start_timer();
311
312        let mutable_memtable = &self.version.memtables.mutable;
313        let prev_memory_usage = if self.written_bytes.is_some() {
314            Some(mutable_memtable.memory_usage())
315        } else {
316            None
317        };
318
319        if self.bulk_parts.len() == 1 {
320            let part = self.bulk_parts.swap_remove(0);
321            let num_rows = part.num_rows();
322            if let Err(e) = self.version.memtables.mutable.write_bulk(part) {
323                self.bulk_notifiers[0].err = Some(Arc::new(e));
324            } else {
325                self.put_num += num_rows;
326            }
327            return;
328        }
329
330        let mut tasks = FuturesUnordered::new();
331        for (i, part) in self.bulk_parts.drain(..).enumerate() {
332            let mutable = mutable_memtable.clone();
333            tasks.push(common_runtime::spawn_blocking_global(move || {
334                let num_rows = part.num_rows();
335                (i, mutable.write_bulk(part), num_rows)
336            }));
337        }
338        while let Some(result) = tasks.next().await {
339            // first unwrap the result from `spawn` above
340            let (i, result, num_rows) = result.unwrap();
341            if let Err(err) = result {
342                self.bulk_notifiers[i].err = Some(Arc::new(err));
343            } else {
344                self.put_num += num_rows;
345            }
346        }
347
348        if let Some(written_bytes) = &self.written_bytes {
349            let new_memory_usage = mutable_memtable.memory_usage();
350            let bytes = new_memory_usage.saturating_sub(prev_memory_usage.unwrap_or_default());
351            written_bytes.fetch_add(bytes as u64, Ordering::Relaxed);
352        }
353        self.version_control
354            .set_sequence_and_entry_id(self.next_sequence - 1, self.next_entry_id - 1);
355    }
356}