mito2/
region_write_ctx.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::mem;
16use std::sync::atomic::{AtomicU64, Ordering};
17use std::sync::Arc;
18
19use api::v1::{BulkWalEntry, Mutation, OpType, Rows, WalEntry, WriteHint};
20use futures::stream::{FuturesUnordered, StreamExt};
21use snafu::ResultExt;
22use store_api::logstore::provider::Provider;
23use store_api::logstore::LogStore;
24use store_api::storage::{RegionId, SequenceNumber};
25
26use crate::error::{Error, Result, WriteGroupSnafu};
27use crate::memtable::bulk::part::BulkPart;
28use crate::memtable::KeyValues;
29use crate::metrics;
30use crate::region::version::{VersionControlData, VersionControlRef, VersionRef};
31use crate::request::OptionOutputTx;
32use crate::wal::{EntryId, WalWriter};
33
34/// Notifier to notify write result on drop.
35struct WriteNotify {
36    /// Error to send to the waiter.
37    err: Option<Arc<Error>>,
38    /// Sender to send write result to the waiter for this mutation.
39    sender: OptionOutputTx,
40    /// Number of rows to be written.
41    num_rows: usize,
42}
43
44impl WriteNotify {
45    /// Creates a new notify from the `sender`.
46    fn new(sender: OptionOutputTx, num_rows: usize) -> WriteNotify {
47        WriteNotify {
48            err: None,
49            sender,
50            num_rows,
51        }
52    }
53
54    /// Send result to the waiter.
55    fn notify_result(&mut self) {
56        if let Some(err) = &self.err {
57            // Try to send the error to waiters.
58            self.sender
59                .send_mut(Err(err.clone()).context(WriteGroupSnafu));
60        } else {
61            // Send success result.
62            self.sender.send_mut(Ok(self.num_rows));
63        }
64    }
65}
66
67impl Drop for WriteNotify {
68    fn drop(&mut self) {
69        self.notify_result();
70    }
71}
72
73/// Context to keep region metadata and buffer write requests.
74pub(crate) struct RegionWriteCtx {
75    /// Id of region to write.
76    region_id: RegionId,
77    /// Version of the region while creating the context.
78    version: VersionRef,
79    /// VersionControl of the region.
80    version_control: VersionControlRef,
81    /// Next sequence number to write.
82    ///
83    /// The context assigns a unique sequence number for each row.
84    next_sequence: SequenceNumber,
85    /// Next entry id of WAL to write.
86    next_entry_id: EntryId,
87    /// Valid WAL entry to write.
88    ///
89    /// We keep [WalEntry] instead of mutations to avoid taking mutations
90    /// out of the context to construct the wal entry when we write to the wal.
91    wal_entry: WalEntry,
92    /// Wal options of the region being written to.
93    provider: Provider,
94    /// Notifiers to send write results to waiters.
95    ///
96    /// The i-th notify is for i-th mutation.
97    notifiers: Vec<WriteNotify>,
98    /// Notifiers for bulk requests.
99    bulk_notifiers: Vec<WriteNotify>,
100    /// Pending bulk write requests
101    pub(crate) bulk_parts: Vec<BulkPart>,
102    /// The write operation is failed and we should not write to the mutable memtable.
103    failed: bool,
104
105    // Metrics:
106    /// Rows to put.
107    pub(crate) put_num: usize,
108    /// Rows to delete.
109    pub(crate) delete_num: usize,
110    /// The total bytes written to the region.
111    pub(crate) written_bytes: Option<Arc<AtomicU64>>,
112}
113
114impl RegionWriteCtx {
115    /// Returns an empty context.
116    pub(crate) fn new(
117        region_id: RegionId,
118        version_control: &VersionControlRef,
119        provider: Provider,
120        written_bytes: Option<Arc<AtomicU64>>,
121    ) -> RegionWriteCtx {
122        let VersionControlData {
123            version,
124            committed_sequence,
125            last_entry_id,
126            ..
127        } = version_control.current();
128
129        RegionWriteCtx {
130            region_id,
131            version,
132            version_control: version_control.clone(),
133            next_sequence: committed_sequence + 1,
134            next_entry_id: last_entry_id + 1,
135            wal_entry: WalEntry::default(),
136            provider,
137            notifiers: Vec::new(),
138            bulk_notifiers: vec![],
139            failed: false,
140            put_num: 0,
141            delete_num: 0,
142            bulk_parts: vec![],
143            written_bytes,
144        }
145    }
146
147    /// Push mutation to the context.
148    pub(crate) fn push_mutation(
149        &mut self,
150        op_type: i32,
151        rows: Option<Rows>,
152        write_hint: Option<WriteHint>,
153        tx: OptionOutputTx,
154    ) {
155        let num_rows = rows.as_ref().map(|rows| rows.rows.len()).unwrap_or(0);
156        self.wal_entry.mutations.push(Mutation {
157            op_type,
158            sequence: self.next_sequence,
159            rows,
160            write_hint,
161        });
162
163        let notify = WriteNotify::new(tx, num_rows);
164        // Notifiers are 1:1 map to mutations.
165        self.notifiers.push(notify);
166
167        // Increase sequence number.
168        self.next_sequence += num_rows as u64;
169
170        // Update metrics.
171        match OpType::try_from(op_type) {
172            Ok(OpType::Delete) => self.delete_num += num_rows,
173            Ok(OpType::Put) => self.put_num += num_rows,
174            Err(_) => (),
175        }
176    }
177
178    /// Encode and add WAL entry to the writer.
179    pub(crate) fn add_wal_entry<S: LogStore>(
180        &mut self,
181        wal_writer: &mut WalWriter<S>,
182    ) -> Result<()> {
183        wal_writer.add_entry(
184            self.region_id,
185            self.next_entry_id,
186            &self.wal_entry,
187            &self.provider,
188        )?;
189        self.next_entry_id += 1;
190        Ok(())
191    }
192
193    pub(crate) fn version(&self) -> &VersionRef {
194        &self.version
195    }
196
197    /// Sets error and marks all write operations are failed.
198    pub(crate) fn set_error(&mut self, err: Arc<Error>) {
199        // Set error for all notifiers
200        for notify in &mut self.notifiers {
201            notify.err = Some(err.clone());
202        }
203
204        // Fail the whole write operation.
205        self.failed = true;
206    }
207
208    /// Updates next entry id.
209    pub(crate) fn set_next_entry_id(&mut self, next_entry_id: EntryId) {
210        self.next_entry_id = next_entry_id
211    }
212
213    /// Consumes mutations and writes them into mutable memtable.
214    pub(crate) async fn write_memtable(&mut self) {
215        debug_assert_eq!(self.notifiers.len(), self.wal_entry.mutations.len());
216
217        if self.failed {
218            return;
219        }
220
221        let mutable_memtable = self.version.memtables.mutable.clone();
222        let prev_memory_usage = if self.written_bytes.is_some() {
223            Some(mutable_memtable.memory_usage())
224        } else {
225            None
226        };
227
228        let mutations = mem::take(&mut self.wal_entry.mutations)
229            .into_iter()
230            .enumerate()
231            .filter_map(|(i, mutation)| {
232                let kvs = KeyValues::new(&self.version.metadata, mutation)?;
233                Some((i, kvs))
234            })
235            .collect::<Vec<_>>();
236
237        if mutations.len() == 1 {
238            if let Err(err) = mutable_memtable.write(&mutations[0].1) {
239                self.notifiers[mutations[0].0].err = Some(Arc::new(err));
240            }
241        } else {
242            let mut tasks = FuturesUnordered::new();
243            for (i, kvs) in mutations {
244                let mutable = mutable_memtable.clone();
245                // use tokio runtime to schedule tasks.
246                tasks.push(common_runtime::spawn_blocking_global(move || {
247                    (i, mutable.write(&kvs))
248                }));
249            }
250
251            while let Some(result) = tasks.next().await {
252                // first unwrap the result from `spawn` above
253                let (i, result) = result.unwrap();
254                if let Err(err) = result {
255                    self.notifiers[i].err = Some(Arc::new(err));
256                }
257            }
258        }
259
260        if let Some(written_bytes) = &self.written_bytes {
261            let new_memory_usage = mutable_memtable.memory_usage();
262            let bytes = new_memory_usage.saturating_sub(prev_memory_usage.unwrap_or_default());
263            written_bytes.fetch_add(bytes as u64, Ordering::Relaxed);
264        }
265        // Updates region sequence and entry id. Since we stores last sequence and entry id in region, we need
266        // to decrease `next_sequence` and `next_entry_id` by 1.
267        self.version_control
268            .set_sequence_and_entry_id(self.next_sequence - 1, self.next_entry_id - 1);
269    }
270
271    pub(crate) fn push_bulk(&mut self, sender: OptionOutputTx, mut bulk: BulkPart) -> bool {
272        bulk.sequence = self.next_sequence;
273        let entry = match BulkWalEntry::try_from(&bulk) {
274            Ok(entry) => entry,
275            Err(e) => {
276                sender.send(Err(e));
277                return false;
278            }
279        };
280
281        self.bulk_notifiers
282            .push(WriteNotify::new(sender, bulk.num_rows()));
283
284        // Add bulk wal entry
285        self.wal_entry.bulk_entries.push(entry);
286        self.next_sequence += bulk.num_rows() as u64;
287        self.bulk_parts.push(bulk);
288        true
289    }
290
291    pub(crate) async fn write_bulk(&mut self) {
292        if self.failed || self.bulk_parts.is_empty() {
293            return;
294        }
295        let _timer = metrics::REGION_WORKER_HANDLE_WRITE_ELAPSED
296            .with_label_values(&["write_bulk"])
297            .start_timer();
298
299        let mutable_memtable = &self.version.memtables.mutable;
300        let prev_memory_usage = if self.written_bytes.is_some() {
301            Some(mutable_memtable.memory_usage())
302        } else {
303            None
304        };
305
306        if self.bulk_parts.len() == 1 {
307            let part = self.bulk_parts.swap_remove(0);
308            let num_rows = part.num_rows();
309            if let Err(e) = self.version.memtables.mutable.write_bulk(part) {
310                self.bulk_notifiers[0].err = Some(Arc::new(e));
311            } else {
312                self.put_num += num_rows;
313            }
314            return;
315        }
316
317        let mut tasks = FuturesUnordered::new();
318        for (i, part) in self.bulk_parts.drain(..).enumerate() {
319            let mutable = mutable_memtable.clone();
320            tasks.push(common_runtime::spawn_blocking_global(move || {
321                let num_rows = part.num_rows();
322                (i, mutable.write_bulk(part), num_rows)
323            }));
324        }
325        while let Some(result) = tasks.next().await {
326            // first unwrap the result from `spawn` above
327            let (i, result, num_rows) = result.unwrap();
328            if let Err(err) = result {
329                self.bulk_notifiers[i].err = Some(Arc::new(err));
330            } else {
331                self.put_num += num_rows;
332            }
333        }
334
335        if let Some(written_bytes) = &self.written_bytes {
336            let new_memory_usage = mutable_memtable.memory_usage();
337            let bytes = new_memory_usage.saturating_sub(prev_memory_usage.unwrap_or_default());
338            written_bytes.fetch_add(bytes as u64, Ordering::Relaxed);
339        }
340        self.version_control
341            .set_sequence_and_entry_id(self.next_sequence - 1, self.next_entry_id - 1);
342    }
343}