1use std::collections::{HashMap, hash_map};
18use std::sync::Arc;
19
20use api::v1::OpType;
21use common_telemetry::{debug, error};
22use common_wal::options::WalOptions;
23use snafu::ensure;
24use store_api::codec::PrimaryKeyEncoding;
25use store_api::logstore::LogStore;
26use store_api::storage::RegionId;
27
28use crate::error::{InvalidRequestSnafu, RegionStateSnafu, RejectWriteSnafu, Result};
29use crate::metrics;
30use crate::metrics::{
31 WRITE_REJECT_TOTAL, WRITE_ROWS_TOTAL, WRITE_STAGE_ELAPSED, WRITE_STALL_TOTAL,
32};
33use crate::region::{RegionLeaderState, RegionRoleState};
34use crate::region_write_ctx::RegionWriteCtx;
35use crate::request::{SenderBulkRequest, SenderWriteRequest, WriteRequest};
36use crate::worker::RegionWorkerLoop;
37
38impl<S: LogStore> RegionWorkerLoop<S> {
39 pub(crate) async fn handle_write_requests(
41 &mut self,
42 write_requests: &mut Vec<SenderWriteRequest>,
43 bulk_requests: &mut Vec<SenderBulkRequest>,
44 allow_stall: bool,
45 ) {
46 if write_requests.is_empty() && bulk_requests.is_empty() {
47 return;
48 }
49
50 self.maybe_flush_worker();
52
53 if self.should_reject_write() {
54 reject_write_requests(write_requests, bulk_requests);
56 self.reject_stalled_requests();
58 return;
59 }
60
61 if self.write_buffer_manager.should_stall() && allow_stall {
62 let stalled_count = (write_requests.len() + bulk_requests.len()) as i64;
63 self.stalling_count.add(stalled_count);
64 WRITE_STALL_TOTAL.inc_by(stalled_count as u64);
65 self.stalled_requests.append(write_requests, bulk_requests);
66 self.listener.on_write_stall();
67 return;
68 }
69
70 let mut region_ctxs = {
72 let _timer = WRITE_STAGE_ELAPSED
73 .with_label_values(&["prepare_ctx"])
74 .start_timer();
75 self.prepare_region_write_ctx(write_requests, bulk_requests)
76 };
77
78 {
80 let _timer = WRITE_STAGE_ELAPSED
81 .with_label_values(&["write_wal"])
82 .start_timer();
83 let mut wal_writer = self.wal.writer();
84 for region_ctx in region_ctxs.values_mut() {
85 if let WalOptions::Noop = ®ion_ctx.version().options.wal_options {
86 continue;
88 }
89 if let Err(e) = region_ctx.add_wal_entry(&mut wal_writer).map_err(Arc::new) {
90 region_ctx.set_error(e);
91 }
92 }
93 match wal_writer.write_to_wal().await.map_err(Arc::new) {
94 Ok(response) => {
95 for (region_id, region_ctx) in region_ctxs.iter_mut() {
96 if let WalOptions::Noop = ®ion_ctx.version().options.wal_options {
97 continue;
98 }
99
100 let last_entry_id = response.last_entry_ids.get(region_id).unwrap();
103 region_ctx.set_next_entry_id(last_entry_id + 1);
104 }
105 }
106 Err(e) => {
107 for mut region_ctx in region_ctxs.into_values() {
109 region_ctx.set_error(e.clone());
110 }
111 return;
112 }
113 }
114 }
115
116 let (mut put_rows, mut delete_rows) = (0, 0);
117 {
119 let _timer = WRITE_STAGE_ELAPSED
120 .with_label_values(&["write_memtable"])
121 .start_timer();
122 if region_ctxs.len() == 1 {
123 let mut region_ctx = region_ctxs.into_values().next().unwrap();
125 region_ctx.write_memtable().await;
126 region_ctx.write_bulk().await;
127 put_rows += region_ctx.put_num;
128 delete_rows += region_ctx.delete_num;
129 } else {
130 let region_write_task = region_ctxs
131 .into_values()
132 .map(|mut region_ctx| {
133 common_runtime::spawn_global(async move {
135 region_ctx.write_memtable().await;
136 region_ctx.write_bulk().await;
137 (region_ctx.put_num, region_ctx.delete_num)
138 })
139 })
140 .collect::<Vec<_>>();
141
142 for result in futures::future::join_all(region_write_task).await {
143 match result {
144 Ok((put, delete)) => {
145 put_rows += put;
146 delete_rows += delete;
147 }
148 Err(e) => {
149 error!(e; "unexpected error when joining region write tasks");
150 }
151 }
152 }
153 }
154 }
155 WRITE_ROWS_TOTAL
156 .with_label_values(&["put"])
157 .inc_by(put_rows as u64);
158 WRITE_ROWS_TOTAL
159 .with_label_values(&["delete"])
160 .inc_by(delete_rows as u64);
161 }
162
163 pub(crate) async fn handle_stalled_requests(&mut self) {
165 let stalled = std::mem::take(&mut self.stalled_requests);
167 self.stalling_count.sub(stalled.stalled_count() as i64);
168 for (_, (_, mut requests, mut bulk)) in stalled.requests {
170 self.handle_write_requests(&mut requests, &mut bulk, false)
171 .await;
172 }
173 }
174
175 pub(crate) fn reject_stalled_requests(&mut self) {
177 let stalled = std::mem::take(&mut self.stalled_requests);
178 self.stalling_count.sub(stalled.stalled_count() as i64);
179 for (_, (_, mut requests, mut bulk)) in stalled.requests {
180 reject_write_requests(&mut requests, &mut bulk);
181 }
182 }
183
184 pub(crate) fn reject_region_stalled_requests(&mut self, region_id: &RegionId) {
186 debug!("Rejects stalled requests for region {}", region_id);
187 let (mut requests, mut bulk) = self.stalled_requests.remove(region_id);
188 self.stalling_count
189 .sub((requests.len() + bulk.len()) as i64);
190 reject_write_requests(&mut requests, &mut bulk);
191 }
192
193 pub(crate) async fn handle_region_stalled_requests(&mut self, region_id: &RegionId) {
195 debug!("Handles stalled requests for region {}", region_id);
196 let (mut requests, mut bulk) = self.stalled_requests.remove(region_id);
197 self.stalling_count
198 .sub((requests.len() + bulk.len()) as i64);
199 self.handle_write_requests(&mut requests, &mut bulk, true)
200 .await;
201 }
202}
203
204impl<S> RegionWorkerLoop<S> {
205 fn prepare_region_write_ctx(
207 &mut self,
208 write_requests: &mut Vec<SenderWriteRequest>,
209 bulk_requests: &mut Vec<SenderBulkRequest>,
210 ) -> HashMap<RegionId, RegionWriteCtx> {
211 let mut region_ctxs = HashMap::new();
213 self.process_write_requests(&mut region_ctxs, write_requests);
214 self.process_bulk_requests(&mut region_ctxs, bulk_requests);
215 region_ctxs
216 }
217
218 fn process_write_requests(
219 &mut self,
220 region_ctxs: &mut HashMap<RegionId, RegionWriteCtx>,
221 write_requests: &mut Vec<SenderWriteRequest>,
222 ) {
223 for mut sender_req in write_requests.drain(..) {
224 let region_id = sender_req.request.region_id;
225
226 if self.flush_scheduler.has_pending_ddls(region_id) {
228 self.flush_scheduler
231 .add_write_request_to_pending(sender_req);
232 continue;
233 }
234
235 if let hash_map::Entry::Vacant(e) = region_ctxs.entry(region_id) {
237 let Some(region) = self
238 .regions
239 .get_region_or(region_id, &mut sender_req.sender)
240 else {
241 continue;
243 };
244 #[cfg(test)]
245 debug!(
246 "Handling write request for region {}, state: {:?}",
247 region_id,
248 region.state()
249 );
250 match region.state() {
251 RegionRoleState::Leader(RegionLeaderState::Writable)
252 | RegionRoleState::Leader(RegionLeaderState::Staging) => {
253 let region_ctx = RegionWriteCtx::new(
254 region.region_id,
255 ®ion.version_control,
256 region.provider.clone(),
257 Some(region.written_bytes.clone()),
258 );
259
260 e.insert(region_ctx);
261 }
262 RegionRoleState::Leader(RegionLeaderState::Altering) => {
263 debug!(
264 "Region {} is altering, add request to pending writes",
265 region.region_id
266 );
267 self.stalling_count.add(1);
268 WRITE_STALL_TOTAL.inc();
269 self.stalled_requests.push(sender_req);
270 continue;
271 }
272 RegionRoleState::Leader(RegionLeaderState::EnteringStaging) => {
273 debug!(
274 "Region {} is entering staging, add request to pending writes",
275 region.region_id
276 );
277 self.stalling_count.add(1);
278 WRITE_STALL_TOTAL.inc();
279 self.stalled_requests.push(sender_req);
280 continue;
281 }
282 state => {
283 sender_req.sender.send(
285 RegionStateSnafu {
286 region_id,
287 state,
288 expect: RegionRoleState::Leader(RegionLeaderState::Writable),
289 }
290 .fail(),
291 );
292 continue;
293 }
294 }
295 }
296
297 let region_ctx = region_ctxs.get_mut(®ion_id).unwrap();
299
300 if let Err(e) = check_op_type(
301 region_ctx.version().options.append_mode,
302 &sender_req.request,
303 ) {
304 sender_req.sender.send(Err(e));
306
307 continue;
308 }
309
310 let need_fill_missing_columns =
312 if let Some(ref region_metadata) = sender_req.request.region_metadata {
313 region_ctx.version().metadata.schema_version != region_metadata.schema_version
314 } else {
315 true
316 };
317 if need_fill_missing_columns
319 && sender_req.request.primary_key_encoding() == PrimaryKeyEncoding::Dense
320 && let Err(e) = sender_req
321 .request
322 .maybe_fill_missing_columns(®ion_ctx.version().metadata)
323 {
324 sender_req.sender.send(Err(e));
325
326 continue;
327 }
328
329 region_ctx.push_mutation(
331 sender_req.request.op_type as i32,
332 Some(sender_req.request.rows),
333 sender_req.request.hint,
334 sender_req.sender,
335 None,
336 );
337 }
338 }
339
340 fn process_bulk_requests(
342 &mut self,
343 region_ctxs: &mut HashMap<RegionId, RegionWriteCtx>,
344 requests: &mut Vec<SenderBulkRequest>,
345 ) {
346 let _timer = metrics::REGION_WORKER_HANDLE_WRITE_ELAPSED
347 .with_label_values(&["prepare_bulk_request"])
348 .start_timer();
349 for mut bulk_req in requests.drain(..) {
350 let region_id = bulk_req.region_id;
351 if self.flush_scheduler.has_pending_ddls(region_id) {
353 self.flush_scheduler.add_bulk_request_to_pending(bulk_req);
355 continue;
356 }
357
358 if let hash_map::Entry::Vacant(e) = region_ctxs.entry(region_id) {
360 let Some(region) = self.regions.get_region_or(region_id, &mut bulk_req.sender)
361 else {
362 continue;
363 };
364 match region.state() {
365 RegionRoleState::Leader(RegionLeaderState::Writable) => {
366 let region_ctx = RegionWriteCtx::new(
367 region.region_id,
368 ®ion.version_control,
369 region.provider.clone(),
370 Some(region.written_bytes.clone()),
371 );
372
373 e.insert(region_ctx);
374 }
375 RegionRoleState::Leader(RegionLeaderState::Altering) => {
376 debug!(
377 "Region {} is altering, add request to pending writes",
378 region.region_id
379 );
380 self.stalling_count.add(1);
381 WRITE_STALL_TOTAL.inc();
382 self.stalled_requests.push_bulk(bulk_req);
383 continue;
384 }
385 state => {
386 bulk_req.sender.send(
388 RegionStateSnafu {
389 region_id,
390 state,
391 expect: RegionRoleState::Leader(RegionLeaderState::Writable),
392 }
393 .fail(),
394 );
395 continue;
396 }
397 }
398 }
399
400 let region_ctx = region_ctxs.get_mut(®ion_id).unwrap();
402
403 let need_fill_missing_columns = region_ctx.version().metadata.schema_version
405 != bulk_req.region_metadata.schema_version;
406
407 if need_fill_missing_columns
409 && let Err(e) = bulk_req
410 .request
411 .fill_missing_columns(®ion_ctx.version().metadata)
412 {
413 bulk_req.sender.send(Err(e));
414 continue;
415 }
416
417 if !region_ctx.push_bulk(bulk_req.sender, bulk_req.request, None) {
419 return;
420 }
421 }
422 }
423
424 pub(crate) fn should_reject_write(&self) -> bool {
426 self.write_buffer_manager.memory_usage() + self.stalled_requests.estimated_size
428 >= self.config.global_write_buffer_reject_size.as_bytes() as usize
429 }
430}
431
432fn reject_write_requests(
434 write_requests: &mut Vec<SenderWriteRequest>,
435 bulk_requests: &mut Vec<SenderBulkRequest>,
436) {
437 WRITE_REJECT_TOTAL.inc_by(write_requests.len() as u64);
438
439 for req in write_requests.drain(..) {
440 req.sender.send(
441 RejectWriteSnafu {
442 region_id: req.request.region_id,
443 }
444 .fail(),
445 );
446 }
447 for req in bulk_requests.drain(..) {
448 let region_id = req.region_id;
449 req.sender.send(RejectWriteSnafu { region_id }.fail());
450 }
451}
452
453fn check_op_type(append_mode: bool, request: &WriteRequest) -> Result<()> {
455 if append_mode {
456 ensure!(
457 request.op_type == OpType::Put,
458 InvalidRequestSnafu {
459 region_id: request.region_id,
460 reason: "DELETE is not allowed under append mode",
461 }
462 );
463 }
464
465 Ok(())
466}