mito2/
region_write_ctx.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::mem;
16use std::sync::Arc;
17
18use api::v1::{Mutation, OpType, Rows, WalEntry, WriteHint};
19use futures::stream::{FuturesUnordered, StreamExt};
20use snafu::ResultExt;
21use store_api::logstore::provider::Provider;
22use store_api::logstore::LogStore;
23use store_api::storage::{RegionId, SequenceNumber};
24
25use crate::error::{Error, Result, WriteGroupSnafu};
26use crate::memtable::bulk::part::BulkPart;
27use crate::memtable::KeyValues;
28use crate::metrics;
29use crate::region::version::{VersionControlData, VersionControlRef, VersionRef};
30use crate::request::OptionOutputTx;
31use crate::wal::{EntryId, WalWriter};
32
33/// Notifier to notify write result on drop.
34struct WriteNotify {
35    /// Error to send to the waiter.
36    err: Option<Arc<Error>>,
37    /// Sender to send write result to the waiter for this mutation.
38    sender: OptionOutputTx,
39    /// Number of rows to be written.
40    num_rows: usize,
41}
42
43impl WriteNotify {
44    /// Creates a new notify from the `sender`.
45    fn new(sender: OptionOutputTx, num_rows: usize) -> WriteNotify {
46        WriteNotify {
47            err: None,
48            sender,
49            num_rows,
50        }
51    }
52
53    /// Send result to the waiter.
54    fn notify_result(&mut self) {
55        if let Some(err) = &self.err {
56            // Try to send the error to waiters.
57            self.sender
58                .send_mut(Err(err.clone()).context(WriteGroupSnafu));
59        } else {
60            // Send success result.
61            self.sender.send_mut(Ok(self.num_rows));
62        }
63    }
64}
65
66impl Drop for WriteNotify {
67    fn drop(&mut self) {
68        self.notify_result();
69    }
70}
71
72/// Context to keep region metadata and buffer write requests.
73pub(crate) struct RegionWriteCtx {
74    /// Id of region to write.
75    region_id: RegionId,
76    /// Version of the region while creating the context.
77    version: VersionRef,
78    /// VersionControl of the region.
79    version_control: VersionControlRef,
80    /// Next sequence number to write.
81    ///
82    /// The context assigns a unique sequence number for each row.
83    next_sequence: SequenceNumber,
84    /// Next entry id of WAL to write.
85    next_entry_id: EntryId,
86    /// Valid WAL entry to write.
87    ///
88    /// We keep [WalEntry] instead of mutations to avoid taking mutations
89    /// out of the context to construct the wal entry when we write to the wal.
90    wal_entry: WalEntry,
91    /// Wal options of the region being written to.
92    provider: Provider,
93    /// Notifiers to send write results to waiters.
94    ///
95    /// The i-th notify is for i-th mutation.
96    notifiers: Vec<WriteNotify>,
97    /// Notifiers for bulk requests.
98    bulk_notifiers: Vec<WriteNotify>,
99    /// Pending bulk write requests
100    pub(crate) bulk_parts: Vec<BulkPart>,
101    /// The write operation is failed and we should not write to the mutable memtable.
102    failed: bool,
103
104    // Metrics:
105    /// Rows to put.
106    pub(crate) put_num: usize,
107    /// Rows to delete.
108    pub(crate) delete_num: usize,
109}
110
111impl RegionWriteCtx {
112    /// Returns an empty context.
113    pub(crate) fn new(
114        region_id: RegionId,
115        version_control: &VersionControlRef,
116        provider: Provider,
117    ) -> RegionWriteCtx {
118        let VersionControlData {
119            version,
120            committed_sequence,
121            last_entry_id,
122            ..
123        } = version_control.current();
124
125        RegionWriteCtx {
126            region_id,
127            version,
128            version_control: version_control.clone(),
129            next_sequence: committed_sequence + 1,
130            next_entry_id: last_entry_id + 1,
131            wal_entry: WalEntry::default(),
132            provider,
133            notifiers: Vec::new(),
134            bulk_notifiers: vec![],
135            failed: false,
136            put_num: 0,
137            delete_num: 0,
138            bulk_parts: vec![],
139        }
140    }
141
142    /// Push mutation to the context.
143    pub(crate) fn push_mutation(
144        &mut self,
145        op_type: i32,
146        rows: Option<Rows>,
147        write_hint: Option<WriteHint>,
148        tx: OptionOutputTx,
149    ) {
150        let num_rows = rows.as_ref().map(|rows| rows.rows.len()).unwrap_or(0);
151        self.wal_entry.mutations.push(Mutation {
152            op_type,
153            sequence: self.next_sequence,
154            rows,
155            write_hint,
156        });
157
158        let notify = WriteNotify::new(tx, num_rows);
159        // Notifiers are 1:1 map to mutations.
160        self.notifiers.push(notify);
161
162        // Increase sequence number.
163        self.next_sequence += num_rows as u64;
164
165        // Update metrics.
166        match OpType::try_from(op_type) {
167            Ok(OpType::Delete) => self.delete_num += num_rows,
168            Ok(OpType::Put) => self.put_num += num_rows,
169            Err(_) => (),
170        }
171    }
172
173    /// Encode and add WAL entry to the writer.
174    pub(crate) fn add_wal_entry<S: LogStore>(
175        &mut self,
176        wal_writer: &mut WalWriter<S>,
177    ) -> Result<()> {
178        wal_writer.add_entry(
179            self.region_id,
180            self.next_entry_id,
181            &self.wal_entry,
182            &self.provider,
183        )?;
184        self.next_entry_id += 1;
185        Ok(())
186    }
187
188    pub(crate) fn version(&self) -> &VersionRef {
189        &self.version
190    }
191
192    /// Sets error and marks all write operations are failed.
193    pub(crate) fn set_error(&mut self, err: Arc<Error>) {
194        // Set error for all notifiers
195        for notify in &mut self.notifiers {
196            notify.err = Some(err.clone());
197        }
198
199        // Fail the whole write operation.
200        self.failed = true;
201    }
202
203    /// Updates next entry id.
204    pub(crate) fn set_next_entry_id(&mut self, next_entry_id: EntryId) {
205        self.next_entry_id = next_entry_id
206    }
207
208    /// Consumes mutations and writes them into mutable memtable.
209    pub(crate) async fn write_memtable(&mut self) {
210        debug_assert_eq!(self.notifiers.len(), self.wal_entry.mutations.len());
211
212        if self.failed {
213            return;
214        }
215
216        let mutable = self.version.memtables.mutable.clone();
217        let mutations = mem::take(&mut self.wal_entry.mutations)
218            .into_iter()
219            .enumerate()
220            .filter_map(|(i, mutation)| {
221                let kvs = KeyValues::new(&self.version.metadata, mutation)?;
222                Some((i, kvs))
223            })
224            .collect::<Vec<_>>();
225
226        if mutations.len() == 1 {
227            if let Err(err) = mutable.write(&mutations[0].1) {
228                self.notifiers[mutations[0].0].err = Some(Arc::new(err));
229            }
230        } else {
231            let mut tasks = FuturesUnordered::new();
232            for (i, kvs) in mutations {
233                let mutable = mutable.clone();
234                // use tokio runtime to schedule tasks.
235                tasks.push(common_runtime::spawn_blocking_global(move || {
236                    (i, mutable.write(&kvs))
237                }));
238            }
239
240            while let Some(result) = tasks.next().await {
241                // first unwrap the result from `spawn` above
242                let (i, result) = result.unwrap();
243                if let Err(err) = result {
244                    self.notifiers[i].err = Some(Arc::new(err));
245                }
246            }
247        }
248
249        // Updates region sequence and entry id. Since we stores last sequence and entry id in region, we need
250        // to decrease `next_sequence` and `next_entry_id` by 1.
251        self.version_control
252            .set_sequence_and_entry_id(self.next_sequence - 1, self.next_entry_id - 1);
253    }
254
255    pub(crate) fn push_bulk(&mut self, sender: OptionOutputTx, mut bulk: BulkPart) {
256        self.bulk_notifiers
257            .push(WriteNotify::new(sender, bulk.num_rows));
258        bulk.sequence = self.next_sequence;
259        self.next_sequence += bulk.num_rows as u64;
260        self.bulk_parts.push(bulk);
261    }
262
263    pub(crate) async fn write_bulk(&mut self) {
264        if self.failed || self.bulk_parts.is_empty() {
265            return;
266        }
267        let _timer = metrics::REGION_WORKER_HANDLE_WRITE_ELAPSED
268            .with_label_values(&["write_bulk"])
269            .start_timer();
270
271        if self.bulk_parts.len() == 1 {
272            let part = self.bulk_parts.swap_remove(0);
273            let num_rows = part.num_rows;
274            if let Err(e) = self.version.memtables.mutable.write_bulk(part) {
275                self.bulk_notifiers[0].err = Some(Arc::new(e));
276            } else {
277                self.put_num += num_rows;
278            }
279            return;
280        }
281
282        let mut tasks = FuturesUnordered::new();
283        for (i, part) in self.bulk_parts.drain(..).enumerate() {
284            let mutable = self.version.memtables.mutable.clone();
285            tasks.push(common_runtime::spawn_blocking_global(move || {
286                let num_rows = part.num_rows;
287                (i, mutable.write_bulk(part), num_rows)
288            }));
289        }
290        while let Some(result) = tasks.next().await {
291            // first unwrap the result from `spawn` above
292            let (i, result, num_rows) = result.unwrap();
293            if let Err(err) = result {
294                self.bulk_notifiers[i].err = Some(Arc::new(err));
295            } else {
296                self.put_num += num_rows;
297            }
298        }
299
300        self.version_control
301            .set_sequence_and_entry_id(self.next_sequence - 1, self.next_entry_id - 1);
302    }
303}