1use std::mem;
16use std::sync::atomic::{AtomicU64, Ordering};
17use std::sync::Arc;
18
19use api::v1::{BulkWalEntry, Mutation, OpType, Rows, WalEntry, WriteHint};
20use futures::stream::{FuturesUnordered, StreamExt};
21use snafu::ResultExt;
22use store_api::logstore::provider::Provider;
23use store_api::logstore::LogStore;
24use store_api::storage::{RegionId, SequenceNumber};
25
26use crate::error::{Error, Result, WriteGroupSnafu};
27use crate::memtable::bulk::part::BulkPart;
28use crate::memtable::KeyValues;
29use crate::metrics;
30use crate::region::version::{VersionControlData, VersionControlRef, VersionRef};
31use crate::request::OptionOutputTx;
32use crate::wal::{EntryId, WalWriter};
33
34struct WriteNotify {
36 err: Option<Arc<Error>>,
38 sender: OptionOutputTx,
40 num_rows: usize,
42}
43
44impl WriteNotify {
45 fn new(sender: OptionOutputTx, num_rows: usize) -> WriteNotify {
47 WriteNotify {
48 err: None,
49 sender,
50 num_rows,
51 }
52 }
53
54 fn notify_result(&mut self) {
56 if let Some(err) = &self.err {
57 self.sender
59 .send_mut(Err(err.clone()).context(WriteGroupSnafu));
60 } else {
61 self.sender.send_mut(Ok(self.num_rows));
63 }
64 }
65}
66
67impl Drop for WriteNotify {
68 fn drop(&mut self) {
69 self.notify_result();
70 }
71}
72
73pub(crate) struct RegionWriteCtx {
75 region_id: RegionId,
77 version: VersionRef,
79 version_control: VersionControlRef,
81 next_sequence: SequenceNumber,
85 next_entry_id: EntryId,
87 wal_entry: WalEntry,
92 provider: Provider,
94 notifiers: Vec<WriteNotify>,
98 bulk_notifiers: Vec<WriteNotify>,
100 pub(crate) bulk_parts: Vec<BulkPart>,
102 failed: bool,
104
105 pub(crate) put_num: usize,
108 pub(crate) delete_num: usize,
110 pub(crate) written_bytes: Option<Arc<AtomicU64>>,
112}
113
114impl RegionWriteCtx {
115 pub(crate) fn new(
117 region_id: RegionId,
118 version_control: &VersionControlRef,
119 provider: Provider,
120 written_bytes: Option<Arc<AtomicU64>>,
121 ) -> RegionWriteCtx {
122 let VersionControlData {
123 version,
124 committed_sequence,
125 last_entry_id,
126 ..
127 } = version_control.current();
128
129 RegionWriteCtx {
130 region_id,
131 version,
132 version_control: version_control.clone(),
133 next_sequence: committed_sequence + 1,
134 next_entry_id: last_entry_id + 1,
135 wal_entry: WalEntry::default(),
136 provider,
137 notifiers: Vec::new(),
138 bulk_notifiers: vec![],
139 failed: false,
140 put_num: 0,
141 delete_num: 0,
142 bulk_parts: vec![],
143 written_bytes,
144 }
145 }
146
147 pub(crate) fn push_mutation(
149 &mut self,
150 op_type: i32,
151 rows: Option<Rows>,
152 write_hint: Option<WriteHint>,
153 tx: OptionOutputTx,
154 ) {
155 let num_rows = rows.as_ref().map(|rows| rows.rows.len()).unwrap_or(0);
156 self.wal_entry.mutations.push(Mutation {
157 op_type,
158 sequence: self.next_sequence,
159 rows,
160 write_hint,
161 });
162
163 let notify = WriteNotify::new(tx, num_rows);
164 self.notifiers.push(notify);
166
167 self.next_sequence += num_rows as u64;
169
170 match OpType::try_from(op_type) {
172 Ok(OpType::Delete) => self.delete_num += num_rows,
173 Ok(OpType::Put) => self.put_num += num_rows,
174 Err(_) => (),
175 }
176 }
177
178 pub(crate) fn add_wal_entry<S: LogStore>(
180 &mut self,
181 wal_writer: &mut WalWriter<S>,
182 ) -> Result<()> {
183 wal_writer.add_entry(
184 self.region_id,
185 self.next_entry_id,
186 &self.wal_entry,
187 &self.provider,
188 )?;
189 self.next_entry_id += 1;
190 Ok(())
191 }
192
193 pub(crate) fn version(&self) -> &VersionRef {
194 &self.version
195 }
196
197 pub(crate) fn set_error(&mut self, err: Arc<Error>) {
199 for notify in &mut self.notifiers {
201 notify.err = Some(err.clone());
202 }
203
204 self.failed = true;
206 }
207
208 pub(crate) fn set_next_entry_id(&mut self, next_entry_id: EntryId) {
210 self.next_entry_id = next_entry_id
211 }
212
213 pub(crate) async fn write_memtable(&mut self) {
215 debug_assert_eq!(self.notifiers.len(), self.wal_entry.mutations.len());
216
217 if self.failed {
218 return;
219 }
220
221 let mutable_memtable = self.version.memtables.mutable.clone();
222 let prev_memory_usage = if self.written_bytes.is_some() {
223 Some(mutable_memtable.memory_usage())
224 } else {
225 None
226 };
227
228 let mutations = mem::take(&mut self.wal_entry.mutations)
229 .into_iter()
230 .enumerate()
231 .filter_map(|(i, mutation)| {
232 let kvs = KeyValues::new(&self.version.metadata, mutation)?;
233 Some((i, kvs))
234 })
235 .collect::<Vec<_>>();
236
237 if mutations.len() == 1 {
238 if let Err(err) = mutable_memtable.write(&mutations[0].1) {
239 self.notifiers[mutations[0].0].err = Some(Arc::new(err));
240 }
241 } else {
242 let mut tasks = FuturesUnordered::new();
243 for (i, kvs) in mutations {
244 let mutable = mutable_memtable.clone();
245 tasks.push(common_runtime::spawn_blocking_global(move || {
247 (i, mutable.write(&kvs))
248 }));
249 }
250
251 while let Some(result) = tasks.next().await {
252 let (i, result) = result.unwrap();
254 if let Err(err) = result {
255 self.notifiers[i].err = Some(Arc::new(err));
256 }
257 }
258 }
259
260 if let Some(written_bytes) = &self.written_bytes {
261 let new_memory_usage = mutable_memtable.memory_usage();
262 let bytes = new_memory_usage.saturating_sub(prev_memory_usage.unwrap_or_default());
263 written_bytes.fetch_add(bytes as u64, Ordering::Relaxed);
264 }
265 self.version_control
268 .set_sequence_and_entry_id(self.next_sequence - 1, self.next_entry_id - 1);
269 }
270
271 pub(crate) fn push_bulk(&mut self, sender: OptionOutputTx, mut bulk: BulkPart) -> bool {
272 bulk.sequence = self.next_sequence;
273 let entry = match BulkWalEntry::try_from(&bulk) {
274 Ok(entry) => entry,
275 Err(e) => {
276 sender.send(Err(e));
277 return false;
278 }
279 };
280
281 self.bulk_notifiers
282 .push(WriteNotify::new(sender, bulk.num_rows()));
283
284 self.wal_entry.bulk_entries.push(entry);
286 self.next_sequence += bulk.num_rows() as u64;
287 self.bulk_parts.push(bulk);
288 true
289 }
290
291 pub(crate) async fn write_bulk(&mut self) {
292 if self.failed || self.bulk_parts.is_empty() {
293 return;
294 }
295 let _timer = metrics::REGION_WORKER_HANDLE_WRITE_ELAPSED
296 .with_label_values(&["write_bulk"])
297 .start_timer();
298
299 let mutable_memtable = &self.version.memtables.mutable;
300 let prev_memory_usage = if self.written_bytes.is_some() {
301 Some(mutable_memtable.memory_usage())
302 } else {
303 None
304 };
305
306 if self.bulk_parts.len() == 1 {
307 let part = self.bulk_parts.swap_remove(0);
308 let num_rows = part.num_rows();
309 if let Err(e) = self.version.memtables.mutable.write_bulk(part) {
310 self.bulk_notifiers[0].err = Some(Arc::new(e));
311 } else {
312 self.put_num += num_rows;
313 }
314 return;
315 }
316
317 let mut tasks = FuturesUnordered::new();
318 for (i, part) in self.bulk_parts.drain(..).enumerate() {
319 let mutable = mutable_memtable.clone();
320 tasks.push(common_runtime::spawn_blocking_global(move || {
321 let num_rows = part.num_rows();
322 (i, mutable.write_bulk(part), num_rows)
323 }));
324 }
325 while let Some(result) = tasks.next().await {
326 let (i, result, num_rows) = result.unwrap();
328 if let Err(err) = result {
329 self.bulk_notifiers[i].err = Some(Arc::new(err));
330 } else {
331 self.put_num += num_rows;
332 }
333 }
334
335 if let Some(written_bytes) = &self.written_bytes {
336 let new_memory_usage = mutable_memtable.memory_usage();
337 let bytes = new_memory_usage.saturating_sub(prev_memory_usage.unwrap_or_default());
338 written_bytes.fetch_add(bytes as u64, Ordering::Relaxed);
339 }
340 self.version_control
341 .set_sequence_and_entry_id(self.next_sequence - 1, self.next_entry_id - 1);
342 }
343}