1use std::mem;
16use std::sync::Arc;
17
18use api::v1::{Mutation, OpType, Rows, WalEntry, WriteHint};
19use futures::stream::{FuturesUnordered, StreamExt};
20use snafu::ResultExt;
21use store_api::logstore::provider::Provider;
22use store_api::logstore::LogStore;
23use store_api::storage::{RegionId, SequenceNumber};
24
25use crate::error::{Error, Result, WriteGroupSnafu};
26use crate::memtable::bulk::part::BulkPart;
27use crate::memtable::KeyValues;
28use crate::metrics;
29use crate::region::version::{VersionControlData, VersionControlRef, VersionRef};
30use crate::request::OptionOutputTx;
31use crate::wal::{EntryId, WalWriter};
32
33struct WriteNotify {
35 err: Option<Arc<Error>>,
37 sender: OptionOutputTx,
39 num_rows: usize,
41}
42
43impl WriteNotify {
44 fn new(sender: OptionOutputTx, num_rows: usize) -> WriteNotify {
46 WriteNotify {
47 err: None,
48 sender,
49 num_rows,
50 }
51 }
52
53 fn notify_result(&mut self) {
55 if let Some(err) = &self.err {
56 self.sender
58 .send_mut(Err(err.clone()).context(WriteGroupSnafu));
59 } else {
60 self.sender.send_mut(Ok(self.num_rows));
62 }
63 }
64}
65
66impl Drop for WriteNotify {
67 fn drop(&mut self) {
68 self.notify_result();
69 }
70}
71
72pub(crate) struct RegionWriteCtx {
74 region_id: RegionId,
76 version: VersionRef,
78 version_control: VersionControlRef,
80 next_sequence: SequenceNumber,
84 next_entry_id: EntryId,
86 wal_entry: WalEntry,
91 provider: Provider,
93 notifiers: Vec<WriteNotify>,
97 bulk_notifiers: Vec<WriteNotify>,
99 pub(crate) bulk_parts: Vec<BulkPart>,
101 failed: bool,
103
104 pub(crate) put_num: usize,
107 pub(crate) delete_num: usize,
109}
110
111impl RegionWriteCtx {
112 pub(crate) fn new(
114 region_id: RegionId,
115 version_control: &VersionControlRef,
116 provider: Provider,
117 ) -> RegionWriteCtx {
118 let VersionControlData {
119 version,
120 committed_sequence,
121 last_entry_id,
122 ..
123 } = version_control.current();
124
125 RegionWriteCtx {
126 region_id,
127 version,
128 version_control: version_control.clone(),
129 next_sequence: committed_sequence + 1,
130 next_entry_id: last_entry_id + 1,
131 wal_entry: WalEntry::default(),
132 provider,
133 notifiers: Vec::new(),
134 bulk_notifiers: vec![],
135 failed: false,
136 put_num: 0,
137 delete_num: 0,
138 bulk_parts: vec![],
139 }
140 }
141
142 pub(crate) fn push_mutation(
144 &mut self,
145 op_type: i32,
146 rows: Option<Rows>,
147 write_hint: Option<WriteHint>,
148 tx: OptionOutputTx,
149 ) {
150 let num_rows = rows.as_ref().map(|rows| rows.rows.len()).unwrap_or(0);
151 self.wal_entry.mutations.push(Mutation {
152 op_type,
153 sequence: self.next_sequence,
154 rows,
155 write_hint,
156 });
157
158 let notify = WriteNotify::new(tx, num_rows);
159 self.notifiers.push(notify);
161
162 self.next_sequence += num_rows as u64;
164
165 match OpType::try_from(op_type) {
167 Ok(OpType::Delete) => self.delete_num += num_rows,
168 Ok(OpType::Put) => self.put_num += num_rows,
169 Err(_) => (),
170 }
171 }
172
173 pub(crate) fn add_wal_entry<S: LogStore>(
175 &mut self,
176 wal_writer: &mut WalWriter<S>,
177 ) -> Result<()> {
178 wal_writer.add_entry(
179 self.region_id,
180 self.next_entry_id,
181 &self.wal_entry,
182 &self.provider,
183 )?;
184 self.next_entry_id += 1;
185 Ok(())
186 }
187
188 pub(crate) fn version(&self) -> &VersionRef {
189 &self.version
190 }
191
192 pub(crate) fn set_error(&mut self, err: Arc<Error>) {
194 for notify in &mut self.notifiers {
196 notify.err = Some(err.clone());
197 }
198
199 self.failed = true;
201 }
202
203 pub(crate) fn set_next_entry_id(&mut self, next_entry_id: EntryId) {
205 self.next_entry_id = next_entry_id
206 }
207
208 pub(crate) async fn write_memtable(&mut self) {
210 debug_assert_eq!(self.notifiers.len(), self.wal_entry.mutations.len());
211
212 if self.failed {
213 return;
214 }
215
216 let mutable = self.version.memtables.mutable.clone();
217 let mutations = mem::take(&mut self.wal_entry.mutations)
218 .into_iter()
219 .enumerate()
220 .filter_map(|(i, mutation)| {
221 let kvs = KeyValues::new(&self.version.metadata, mutation)?;
222 Some((i, kvs))
223 })
224 .collect::<Vec<_>>();
225
226 if mutations.len() == 1 {
227 if let Err(err) = mutable.write(&mutations[0].1) {
228 self.notifiers[mutations[0].0].err = Some(Arc::new(err));
229 }
230 } else {
231 let mut tasks = FuturesUnordered::new();
232 for (i, kvs) in mutations {
233 let mutable = mutable.clone();
234 tasks.push(common_runtime::spawn_blocking_global(move || {
236 (i, mutable.write(&kvs))
237 }));
238 }
239
240 while let Some(result) = tasks.next().await {
241 let (i, result) = result.unwrap();
243 if let Err(err) = result {
244 self.notifiers[i].err = Some(Arc::new(err));
245 }
246 }
247 }
248
249 self.version_control
252 .set_sequence_and_entry_id(self.next_sequence - 1, self.next_entry_id - 1);
253 }
254
255 pub(crate) fn push_bulk(&mut self, sender: OptionOutputTx, mut bulk: BulkPart) {
256 self.bulk_notifiers
257 .push(WriteNotify::new(sender, bulk.num_rows));
258 bulk.sequence = self.next_sequence;
259 self.next_sequence += bulk.num_rows as u64;
260 self.bulk_parts.push(bulk);
261 }
262
263 pub(crate) async fn write_bulk(&mut self) {
264 if self.failed || self.bulk_parts.is_empty() {
265 return;
266 }
267 let _timer = metrics::REGION_WORKER_HANDLE_WRITE_ELAPSED
268 .with_label_values(&["write_bulk"])
269 .start_timer();
270
271 if self.bulk_parts.len() == 1 {
272 let part = self.bulk_parts.swap_remove(0);
273 let num_rows = part.num_rows;
274 if let Err(e) = self.version.memtables.mutable.write_bulk(part) {
275 self.bulk_notifiers[0].err = Some(Arc::new(e));
276 } else {
277 self.put_num += num_rows;
278 }
279 return;
280 }
281
282 let mut tasks = FuturesUnordered::new();
283 for (i, part) in self.bulk_parts.drain(..).enumerate() {
284 let mutable = self.version.memtables.mutable.clone();
285 tasks.push(common_runtime::spawn_blocking_global(move || {
286 let num_rows = part.num_rows;
287 (i, mutable.write_bulk(part), num_rows)
288 }));
289 }
290 while let Some(result) = tasks.next().await {
291 let (i, result, num_rows) = result.unwrap();
293 if let Err(err) = result {
294 self.bulk_notifiers[i].err = Some(Arc::new(err));
295 } else {
296 self.put_num += num_rows;
297 }
298 }
299
300 self.version_control
301 .set_sequence_and_entry_id(self.next_sequence - 1, self.next_entry_id - 1);
302 }
303}