mito2/
region_write_ctx.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::mem;
16use std::sync::Arc;
17
18use api::v1::{Mutation, OpType, Rows, WalEntry, WriteHint};
19use futures::stream::{FuturesUnordered, StreamExt};
20use snafu::ResultExt;
21use store_api::logstore::provider::Provider;
22use store_api::logstore::LogStore;
23use store_api::storage::{RegionId, SequenceNumber};
24
25use crate::error::{Error, Result, WriteGroupSnafu};
26use crate::memtable::KeyValues;
27use crate::region::version::{VersionControlData, VersionControlRef, VersionRef};
28use crate::request::OptionOutputTx;
29use crate::wal::{EntryId, WalWriter};
30
31/// Notifier to notify write result on drop.
32struct WriteNotify {
33    /// Error to send to the waiter.
34    err: Option<Arc<Error>>,
35    /// Sender to send write result to the waiter for this mutation.
36    sender: OptionOutputTx,
37    /// Number of rows to be written.
38    num_rows: usize,
39}
40
41impl WriteNotify {
42    /// Creates a new notify from the `sender`.
43    fn new(sender: OptionOutputTx, num_rows: usize) -> WriteNotify {
44        WriteNotify {
45            err: None,
46            sender,
47            num_rows,
48        }
49    }
50
51    /// Send result to the waiter.
52    fn notify_result(&mut self) {
53        if let Some(err) = &self.err {
54            // Try to send the error to waiters.
55            self.sender
56                .send_mut(Err(err.clone()).context(WriteGroupSnafu));
57        } else {
58            // Send success result.
59            self.sender.send_mut(Ok(self.num_rows));
60        }
61    }
62}
63
64impl Drop for WriteNotify {
65    fn drop(&mut self) {
66        self.notify_result();
67    }
68}
69
70/// Context to keep region metadata and buffer write requests.
71pub(crate) struct RegionWriteCtx {
72    /// Id of region to write.
73    region_id: RegionId,
74    /// Version of the region while creating the context.
75    version: VersionRef,
76    /// VersionControl of the region.
77    version_control: VersionControlRef,
78    /// Next sequence number to write.
79    ///
80    /// The context assigns a unique sequence number for each row.
81    next_sequence: SequenceNumber,
82    /// Next entry id of WAL to write.
83    next_entry_id: EntryId,
84    /// Valid WAL entry to write.
85    ///
86    /// We keep [WalEntry] instead of mutations to avoid taking mutations
87    /// out of the context to construct the wal entry when we write to the wal.
88    wal_entry: WalEntry,
89    /// Wal options of the region being written to.
90    provider: Provider,
91    /// Notifiers to send write results to waiters.
92    ///
93    /// The i-th notify is for i-th mutation.
94    notifiers: Vec<WriteNotify>,
95    /// The write operation is failed and we should not write to the mutable memtable.
96    failed: bool,
97
98    // Metrics:
99    /// Rows to put.
100    pub(crate) put_num: usize,
101    /// Rows to delete.
102    pub(crate) delete_num: usize,
103}
104
105impl RegionWriteCtx {
106    /// Returns an empty context.
107    pub(crate) fn new(
108        region_id: RegionId,
109        version_control: &VersionControlRef,
110        provider: Provider,
111    ) -> RegionWriteCtx {
112        let VersionControlData {
113            version,
114            committed_sequence,
115            last_entry_id,
116            ..
117        } = version_control.current();
118
119        RegionWriteCtx {
120            region_id,
121            version,
122            version_control: version_control.clone(),
123            next_sequence: committed_sequence + 1,
124            next_entry_id: last_entry_id + 1,
125            wal_entry: WalEntry::default(),
126            provider,
127            notifiers: Vec::new(),
128            failed: false,
129            put_num: 0,
130            delete_num: 0,
131        }
132    }
133
134    /// Push mutation to the context.
135    pub(crate) fn push_mutation(
136        &mut self,
137        op_type: i32,
138        rows: Option<Rows>,
139        write_hint: Option<WriteHint>,
140        tx: OptionOutputTx,
141    ) {
142        let num_rows = rows.as_ref().map(|rows| rows.rows.len()).unwrap_or(0);
143        self.wal_entry.mutations.push(Mutation {
144            op_type,
145            sequence: self.next_sequence,
146            rows,
147            write_hint,
148        });
149
150        let notify = WriteNotify::new(tx, num_rows);
151        // Notifiers are 1:1 map to mutations.
152        self.notifiers.push(notify);
153
154        // Increase sequence number.
155        self.next_sequence += num_rows as u64;
156
157        // Update metrics.
158        match OpType::try_from(op_type) {
159            Ok(OpType::Delete) => self.delete_num += num_rows,
160            Ok(OpType::Put) => self.put_num += num_rows,
161            Err(_) => (),
162        }
163    }
164
165    /// Encode and add WAL entry to the writer.
166    pub(crate) fn add_wal_entry<S: LogStore>(
167        &mut self,
168        wal_writer: &mut WalWriter<S>,
169    ) -> Result<()> {
170        wal_writer.add_entry(
171            self.region_id,
172            self.next_entry_id,
173            &self.wal_entry,
174            &self.provider,
175        )?;
176        self.next_entry_id += 1;
177        Ok(())
178    }
179
180    pub(crate) fn version(&self) -> &VersionRef {
181        &self.version
182    }
183
184    /// Sets error and marks all write operations are failed.
185    pub(crate) fn set_error(&mut self, err: Arc<Error>) {
186        // Set error for all notifiers
187        for notify in &mut self.notifiers {
188            notify.err = Some(err.clone());
189        }
190
191        // Fail the whole write operation.
192        self.failed = true;
193    }
194
195    /// Updates next entry id.
196    pub(crate) fn set_next_entry_id(&mut self, next_entry_id: EntryId) {
197        self.next_entry_id = next_entry_id
198    }
199
200    /// Consumes mutations and writes them into mutable memtable.
201    pub(crate) async fn write_memtable(&mut self) {
202        debug_assert_eq!(self.notifiers.len(), self.wal_entry.mutations.len());
203
204        if self.failed {
205            return;
206        }
207
208        let mutable = self.version.memtables.mutable.clone();
209        let mutations = mem::take(&mut self.wal_entry.mutations)
210            .into_iter()
211            .enumerate()
212            .filter_map(|(i, mutation)| {
213                let kvs = KeyValues::new(&self.version.metadata, mutation)?;
214                Some((i, kvs))
215            })
216            .collect::<Vec<_>>();
217
218        if mutations.len() == 1 {
219            if let Err(err) = mutable.write(&mutations[0].1) {
220                self.notifiers[mutations[0].0].err = Some(Arc::new(err));
221            }
222        } else {
223            let mut tasks = FuturesUnordered::new();
224            for (i, kvs) in mutations {
225                let mutable = mutable.clone();
226                // use tokio runtime to schedule tasks.
227                tasks.push(common_runtime::spawn_blocking_global(move || {
228                    (i, mutable.write(&kvs))
229                }));
230            }
231
232            while let Some(result) = tasks.next().await {
233                // first unwrap the result from `spawn` above
234                let (i, result) = result.unwrap();
235                if let Err(err) = result {
236                    self.notifiers[i].err = Some(Arc::new(err));
237                }
238            }
239        }
240
241        // Updates region sequence and entry id. Since we stores last sequence and entry id in region, we need
242        // to decrease `next_sequence` and `next_entry_id` by 1.
243        self.version_control
244            .set_sequence_and_entry_id(self.next_sequence - 1, self.next_entry_id - 1);
245    }
246}