mito2/
region_write_ctx.rs1use std::mem;
16use std::sync::Arc;
17
18use api::v1::{Mutation, OpType, Rows, WalEntry, WriteHint};
19use futures::stream::{FuturesUnordered, StreamExt};
20use snafu::ResultExt;
21use store_api::logstore::provider::Provider;
22use store_api::logstore::LogStore;
23use store_api::storage::{RegionId, SequenceNumber};
24
25use crate::error::{Error, Result, WriteGroupSnafu};
26use crate::memtable::KeyValues;
27use crate::region::version::{VersionControlData, VersionControlRef, VersionRef};
28use crate::request::OptionOutputTx;
29use crate::wal::{EntryId, WalWriter};
30
31struct WriteNotify {
33 err: Option<Arc<Error>>,
35 sender: OptionOutputTx,
37 num_rows: usize,
39}
40
41impl WriteNotify {
42 fn new(sender: OptionOutputTx, num_rows: usize) -> WriteNotify {
44 WriteNotify {
45 err: None,
46 sender,
47 num_rows,
48 }
49 }
50
51 fn notify_result(&mut self) {
53 if let Some(err) = &self.err {
54 self.sender
56 .send_mut(Err(err.clone()).context(WriteGroupSnafu));
57 } else {
58 self.sender.send_mut(Ok(self.num_rows));
60 }
61 }
62}
63
64impl Drop for WriteNotify {
65 fn drop(&mut self) {
66 self.notify_result();
67 }
68}
69
70pub(crate) struct RegionWriteCtx {
72 region_id: RegionId,
74 version: VersionRef,
76 version_control: VersionControlRef,
78 next_sequence: SequenceNumber,
82 next_entry_id: EntryId,
84 wal_entry: WalEntry,
89 provider: Provider,
91 notifiers: Vec<WriteNotify>,
95 failed: bool,
97
98 pub(crate) put_num: usize,
101 pub(crate) delete_num: usize,
103}
104
105impl RegionWriteCtx {
106 pub(crate) fn new(
108 region_id: RegionId,
109 version_control: &VersionControlRef,
110 provider: Provider,
111 ) -> RegionWriteCtx {
112 let VersionControlData {
113 version,
114 committed_sequence,
115 last_entry_id,
116 ..
117 } = version_control.current();
118
119 RegionWriteCtx {
120 region_id,
121 version,
122 version_control: version_control.clone(),
123 next_sequence: committed_sequence + 1,
124 next_entry_id: last_entry_id + 1,
125 wal_entry: WalEntry::default(),
126 provider,
127 notifiers: Vec::new(),
128 failed: false,
129 put_num: 0,
130 delete_num: 0,
131 }
132 }
133
134 pub(crate) fn push_mutation(
136 &mut self,
137 op_type: i32,
138 rows: Option<Rows>,
139 write_hint: Option<WriteHint>,
140 tx: OptionOutputTx,
141 ) {
142 let num_rows = rows.as_ref().map(|rows| rows.rows.len()).unwrap_or(0);
143 self.wal_entry.mutations.push(Mutation {
144 op_type,
145 sequence: self.next_sequence,
146 rows,
147 write_hint,
148 });
149
150 let notify = WriteNotify::new(tx, num_rows);
151 self.notifiers.push(notify);
153
154 self.next_sequence += num_rows as u64;
156
157 match OpType::try_from(op_type) {
159 Ok(OpType::Delete) => self.delete_num += num_rows,
160 Ok(OpType::Put) => self.put_num += num_rows,
161 Err(_) => (),
162 }
163 }
164
165 pub(crate) fn add_wal_entry<S: LogStore>(
167 &mut self,
168 wal_writer: &mut WalWriter<S>,
169 ) -> Result<()> {
170 wal_writer.add_entry(
171 self.region_id,
172 self.next_entry_id,
173 &self.wal_entry,
174 &self.provider,
175 )?;
176 self.next_entry_id += 1;
177 Ok(())
178 }
179
180 pub(crate) fn version(&self) -> &VersionRef {
181 &self.version
182 }
183
184 pub(crate) fn set_error(&mut self, err: Arc<Error>) {
186 for notify in &mut self.notifiers {
188 notify.err = Some(err.clone());
189 }
190
191 self.failed = true;
193 }
194
195 pub(crate) fn set_next_entry_id(&mut self, next_entry_id: EntryId) {
197 self.next_entry_id = next_entry_id
198 }
199
200 pub(crate) async fn write_memtable(&mut self) {
202 debug_assert_eq!(self.notifiers.len(), self.wal_entry.mutations.len());
203
204 if self.failed {
205 return;
206 }
207
208 let mutable = self.version.memtables.mutable.clone();
209 let mutations = mem::take(&mut self.wal_entry.mutations)
210 .into_iter()
211 .enumerate()
212 .filter_map(|(i, mutation)| {
213 let kvs = KeyValues::new(&self.version.metadata, mutation)?;
214 Some((i, kvs))
215 })
216 .collect::<Vec<_>>();
217
218 if mutations.len() == 1 {
219 if let Err(err) = mutable.write(&mutations[0].1) {
220 self.notifiers[mutations[0].0].err = Some(Arc::new(err));
221 }
222 } else {
223 let mut tasks = FuturesUnordered::new();
224 for (i, kvs) in mutations {
225 let mutable = mutable.clone();
226 tasks.push(common_runtime::spawn_blocking_global(move || {
228 (i, mutable.write(&kvs))
229 }));
230 }
231
232 while let Some(result) = tasks.next().await {
233 let (i, result) = result.unwrap();
235 if let Err(err) = result {
236 self.notifiers[i].err = Some(Arc::new(err));
237 }
238 }
239 }
240
241 self.version_control
244 .set_sequence_and_entry_id(self.next_sequence - 1, self.next_entry_id - 1);
245 }
246}