1use std::mem;
16use std::sync::Arc;
17use std::sync::atomic::{AtomicU64, Ordering};
18
19use api::v1::{BulkWalEntry, Mutation, OpType, Rows, WalEntry, WriteHint};
20use futures::stream::{FuturesUnordered, StreamExt};
21use snafu::ResultExt;
22use store_api::logstore::LogStore;
23use store_api::logstore::provider::Provider;
24use store_api::storage::{RegionId, SequenceNumber};
25
26use crate::error::{Error, Result, WriteGroupSnafu};
27use crate::memtable::KeyValues;
28use crate::memtable::bulk::part::BulkPart;
29use crate::metrics;
30use crate::region::version::{VersionControlData, VersionControlRef, VersionRef};
31use crate::request::OptionOutputTx;
32use crate::wal::{EntryId, WalWriter};
33
34struct WriteNotify {
36 err: Option<Arc<Error>>,
38 sender: OptionOutputTx,
40 num_rows: usize,
42}
43
44impl WriteNotify {
45 fn new(sender: OptionOutputTx, num_rows: usize) -> WriteNotify {
47 WriteNotify {
48 err: None,
49 sender,
50 num_rows,
51 }
52 }
53
54 fn notify_result(&mut self) {
56 if let Some(err) = &self.err {
57 self.sender
59 .send_mut(Err(err.clone()).context(WriteGroupSnafu));
60 } else {
61 self.sender.send_mut(Ok(self.num_rows));
63 }
64 }
65}
66
67impl Drop for WriteNotify {
68 fn drop(&mut self) {
69 self.notify_result();
70 }
71}
72
73pub(crate) struct RegionWriteCtx {
75 region_id: RegionId,
77 version: VersionRef,
79 version_control: VersionControlRef,
81 next_sequence: SequenceNumber,
85 next_entry_id: EntryId,
87 wal_entry: WalEntry,
92 provider: Provider,
94 notifiers: Vec<WriteNotify>,
98 bulk_notifiers: Vec<WriteNotify>,
100 pub(crate) bulk_parts: Vec<BulkPart>,
102 failed: bool,
104
105 pub(crate) put_num: usize,
108 pub(crate) delete_num: usize,
110 pub(crate) written_bytes: Option<Arc<AtomicU64>>,
112}
113
114impl RegionWriteCtx {
115 pub(crate) fn new(
117 region_id: RegionId,
118 version_control: &VersionControlRef,
119 provider: Provider,
120 written_bytes: Option<Arc<AtomicU64>>,
121 ) -> RegionWriteCtx {
122 let VersionControlData {
123 version,
124 committed_sequence,
125 last_entry_id,
126 ..
127 } = version_control.current();
128
129 RegionWriteCtx {
130 region_id,
131 version,
132 version_control: version_control.clone(),
133 next_sequence: committed_sequence + 1,
134 next_entry_id: last_entry_id + 1,
135 wal_entry: WalEntry::default(),
136 provider,
137 notifiers: Vec::new(),
138 bulk_notifiers: vec![],
139 failed: false,
140 put_num: 0,
141 delete_num: 0,
142 bulk_parts: vec![],
143 written_bytes,
144 }
145 }
146
147 pub(crate) fn push_mutation(
150 &mut self,
151 op_type: i32,
152 rows: Option<Rows>,
153 write_hint: Option<WriteHint>,
154 tx: OptionOutputTx,
155 sequence: Option<SequenceNumber>,
156 ) {
157 if let Some(sequence) = sequence {
158 self.next_sequence = sequence;
159 }
160 let num_rows = rows.as_ref().map(|rows| rows.rows.len()).unwrap_or(0);
161 self.wal_entry.mutations.push(Mutation {
162 op_type,
163 sequence: self.next_sequence,
164 rows,
165 write_hint,
166 });
167
168 let notify = WriteNotify::new(tx, num_rows);
169 self.notifiers.push(notify);
171
172 self.next_sequence += num_rows as u64;
174
175 match OpType::try_from(op_type) {
177 Ok(OpType::Delete) => self.delete_num += num_rows,
178 Ok(OpType::Put) => self.put_num += num_rows,
179 Err(_) => (),
180 }
181 }
182
183 pub(crate) fn add_wal_entry<S: LogStore>(
185 &mut self,
186 wal_writer: &mut WalWriter<S>,
187 ) -> Result<()> {
188 wal_writer.add_entry(
189 self.region_id,
190 self.next_entry_id,
191 &self.wal_entry,
192 &self.provider,
193 )?;
194 self.next_entry_id += 1;
195 Ok(())
196 }
197
198 pub(crate) fn version(&self) -> &VersionRef {
199 &self.version
200 }
201
202 pub(crate) fn set_error(&mut self, err: Arc<Error>) {
204 for notify in &mut self.notifiers {
206 notify.err = Some(err.clone());
207 }
208
209 self.failed = true;
211 }
212
213 pub(crate) fn set_next_entry_id(&mut self, next_entry_id: EntryId) {
215 self.next_entry_id = next_entry_id
216 }
217
218 pub(crate) async fn write_memtable(&mut self) {
220 debug_assert_eq!(self.notifiers.len(), self.wal_entry.mutations.len());
221
222 if self.failed {
223 return;
224 }
225
226 let mutable_memtable = self.version.memtables.mutable.clone();
227 let prev_memory_usage = if self.written_bytes.is_some() {
228 Some(mutable_memtable.memory_usage())
229 } else {
230 None
231 };
232
233 let mutations = mem::take(&mut self.wal_entry.mutations)
234 .into_iter()
235 .enumerate()
236 .filter_map(|(i, mutation)| {
237 let kvs = KeyValues::new(&self.version.metadata, mutation)?;
238 Some((i, kvs))
239 })
240 .collect::<Vec<_>>();
241
242 if mutations.len() == 1 {
243 if let Err(err) = mutable_memtable.write(&mutations[0].1) {
244 self.notifiers[mutations[0].0].err = Some(Arc::new(err));
245 }
246 } else {
247 let mut tasks = FuturesUnordered::new();
248 for (i, kvs) in mutations {
249 let mutable = mutable_memtable.clone();
250 tasks.push(common_runtime::spawn_blocking_global(move || {
252 (i, mutable.write(&kvs))
253 }));
254 }
255
256 while let Some(result) = tasks.next().await {
257 let (i, result) = result.unwrap();
259 if let Err(err) = result {
260 self.notifiers[i].err = Some(Arc::new(err));
261 }
262 }
263 }
264
265 if let Some(written_bytes) = &self.written_bytes {
266 let new_memory_usage = mutable_memtable.memory_usage();
267 let bytes = new_memory_usage.saturating_sub(prev_memory_usage.unwrap_or_default());
268 written_bytes.fetch_add(bytes as u64, Ordering::Relaxed);
269 }
270 self.version_control
273 .set_sequence_and_entry_id(self.next_sequence - 1, self.next_entry_id - 1);
274 }
275
276 pub(crate) fn push_bulk(
277 &mut self,
278 sender: OptionOutputTx,
279 mut bulk: BulkPart,
280 sequence: Option<SequenceNumber>,
281 ) -> bool {
282 if let Some(sequence) = sequence {
283 self.next_sequence = sequence;
284 }
285 bulk.sequence = self.next_sequence;
286 let entry = match BulkWalEntry::try_from(&bulk) {
287 Ok(entry) => entry,
288 Err(e) => {
289 sender.send(Err(e));
290 return false;
291 }
292 };
293
294 self.bulk_notifiers
295 .push(WriteNotify::new(sender, bulk.num_rows()));
296
297 self.wal_entry.bulk_entries.push(entry);
299 self.next_sequence += bulk.num_rows() as u64;
300 self.bulk_parts.push(bulk);
301 true
302 }
303
304 pub(crate) async fn write_bulk(&mut self) {
305 if self.failed || self.bulk_parts.is_empty() {
306 return;
307 }
308 let _timer = metrics::REGION_WORKER_HANDLE_WRITE_ELAPSED
309 .with_label_values(&["write_bulk"])
310 .start_timer();
311
312 let mutable_memtable = &self.version.memtables.mutable;
313 let prev_memory_usage = if self.written_bytes.is_some() {
314 Some(mutable_memtable.memory_usage())
315 } else {
316 None
317 };
318
319 if self.bulk_parts.len() == 1 {
320 let part = self.bulk_parts.swap_remove(0);
321 let num_rows = part.num_rows();
322 if let Err(e) = self.version.memtables.mutable.write_bulk(part) {
323 self.bulk_notifiers[0].err = Some(Arc::new(e));
324 } else {
325 self.put_num += num_rows;
326 }
327 return;
328 }
329
330 let mut tasks = FuturesUnordered::new();
331 for (i, part) in self.bulk_parts.drain(..).enumerate() {
332 let mutable = mutable_memtable.clone();
333 tasks.push(common_runtime::spawn_blocking_global(move || {
334 let num_rows = part.num_rows();
335 (i, mutable.write_bulk(part), num_rows)
336 }));
337 }
338 while let Some(result) = tasks.next().await {
339 let (i, result, num_rows) = result.unwrap();
341 if let Err(err) = result {
342 self.bulk_notifiers[i].err = Some(Arc::new(err));
343 } else {
344 self.put_num += num_rows;
345 }
346 }
347
348 if let Some(written_bytes) = &self.written_bytes {
349 let new_memory_usage = mutable_memtable.memory_usage();
350 let bytes = new_memory_usage.saturating_sub(prev_memory_usage.unwrap_or_default());
351 written_bytes.fetch_add(bytes as u64, Ordering::Relaxed);
352 }
353 self.version_control
354 .set_sequence_and_entry_id(self.next_sequence - 1, self.next_entry_id - 1);
355 }
356}