1use std::collections::{HashMap, hash_map};
18use std::sync::Arc;
19
20use api::v1::OpType;
21use common_telemetry::{debug, error};
22use common_wal::options::WalOptions;
23use snafu::ensure;
24use store_api::codec::PrimaryKeyEncoding;
25use store_api::logstore::LogStore;
26use store_api::storage::RegionId;
27
28use crate::error::{
29 InvalidRequestSnafu, PartitionExprVersionMismatchSnafu, RegionNotFoundSnafu, RegionStateSnafu,
30 RejectWriteSnafu, Result,
31};
32use crate::metrics;
33use crate::metrics::{
34 WRITE_REJECT_TOTAL, WRITE_ROWS_TOTAL, WRITE_STAGE_ELAPSED, WRITE_STALL_TOTAL,
35};
36use crate::region::{RegionLeaderState, RegionRoleState};
37use crate::region_write_ctx::RegionWriteCtx;
38use crate::request::{SenderBulkRequest, SenderWriteRequest, WriteRequest};
39use crate::worker::RegionWorkerLoop;
40
41impl<S: LogStore> RegionWorkerLoop<S> {
42 pub(crate) async fn handle_write_requests(
44 &mut self,
45 write_requests: &mut Vec<SenderWriteRequest>,
46 bulk_requests: &mut Vec<SenderBulkRequest>,
47 allow_stall: bool,
48 ) {
49 if write_requests.is_empty() && bulk_requests.is_empty() {
50 return;
51 }
52
53 self.maybe_flush_worker();
55
56 if self.should_reject_write() {
57 reject_write_requests(write_requests, bulk_requests);
59 self.reject_stalled_requests();
61 return;
62 }
63
64 if self.write_buffer_manager.should_stall() && allow_stall {
65 let stalled_count = (write_requests.len() + bulk_requests.len()) as i64;
66 self.stalling_count.add(stalled_count);
67 WRITE_STALL_TOTAL.inc_by(stalled_count as u64);
68 self.stalled_requests.append(write_requests, bulk_requests);
69 self.listener.on_write_stall();
70 return;
71 }
72
73 let mut region_ctxs = {
75 let _timer = WRITE_STAGE_ELAPSED
76 .with_label_values(&["prepare_ctx"])
77 .start_timer();
78 self.prepare_region_write_ctx(write_requests, bulk_requests)
79 };
80
81 {
83 let _timer = WRITE_STAGE_ELAPSED
84 .with_label_values(&["write_wal"])
85 .start_timer();
86 let mut wal_writer = self.wal.writer();
87 for region_ctx in region_ctxs.values_mut() {
88 if let WalOptions::Noop = ®ion_ctx.version().options.wal_options {
89 continue;
91 }
92 if let Err(e) = region_ctx.add_wal_entry(&mut wal_writer).map_err(Arc::new) {
93 region_ctx.set_error(e);
94 }
95 }
96 match wal_writer.write_to_wal().await.map_err(Arc::new) {
97 Ok(response) => {
98 for (region_id, region_ctx) in region_ctxs.iter_mut() {
99 if let WalOptions::Noop = ®ion_ctx.version().options.wal_options {
100 continue;
101 }
102
103 let last_entry_id = response.last_entry_ids.get(region_id).unwrap();
106 region_ctx.set_next_entry_id(last_entry_id + 1);
107 }
108 }
109 Err(e) => {
110 for mut region_ctx in region_ctxs.into_values() {
112 region_ctx.set_error(e.clone());
113 }
114 return;
115 }
116 }
117 }
118
119 let (mut put_rows, mut delete_rows) = (0, 0);
120 {
122 let _timer = WRITE_STAGE_ELAPSED
123 .with_label_values(&["write_memtable"])
124 .start_timer();
125 if region_ctxs.len() == 1 {
126 let mut region_ctx = region_ctxs.into_values().next().unwrap();
128 region_ctx.write_memtable().await;
129 region_ctx.write_bulk().await;
130 put_rows += region_ctx.put_num;
131 delete_rows += region_ctx.delete_num;
132 } else {
133 let region_write_task = region_ctxs
134 .into_values()
135 .map(|mut region_ctx| {
136 common_runtime::spawn_global(async move {
138 region_ctx.write_memtable().await;
139 region_ctx.write_bulk().await;
140 (region_ctx.put_num, region_ctx.delete_num)
141 })
142 })
143 .collect::<Vec<_>>();
144
145 for result in futures::future::join_all(region_write_task).await {
146 match result {
147 Ok((put, delete)) => {
148 put_rows += put;
149 delete_rows += delete;
150 }
151 Err(e) => {
152 error!(e; "unexpected error when joining region write tasks");
153 }
154 }
155 }
156 }
157 }
158 WRITE_ROWS_TOTAL
159 .with_label_values(&["put"])
160 .inc_by(put_rows as u64);
161 WRITE_ROWS_TOTAL
162 .with_label_values(&["delete"])
163 .inc_by(delete_rows as u64);
164 }
165
166 pub(crate) async fn handle_stalled_requests(&mut self) {
168 let stalled = std::mem::take(&mut self.stalled_requests);
170 self.stalling_count.sub(stalled.stalled_count() as i64);
171 for (_, (_, mut requests, mut bulk)) in stalled.requests {
173 self.handle_write_requests(&mut requests, &mut bulk, false)
174 .await;
175 }
176 }
177
178 pub(crate) fn reject_stalled_requests(&mut self) {
180 let stalled = std::mem::take(&mut self.stalled_requests);
181 self.stalling_count.sub(stalled.stalled_count() as i64);
182 for (_, (_, mut requests, mut bulk)) in stalled.requests {
183 reject_write_requests(&mut requests, &mut bulk);
184 }
185 }
186
187 pub(crate) fn reject_region_stalled_requests(&mut self, region_id: &RegionId) {
189 debug!("Rejects stalled requests for region {}", region_id);
190 let (mut requests, mut bulk) = self.stalled_requests.remove(region_id);
191 self.stalling_count
192 .sub((requests.len() + bulk.len()) as i64);
193 reject_write_requests(&mut requests, &mut bulk);
194 }
195
196 pub(crate) fn fail_region_stalled_requests_as_not_found(&mut self, region_id: &RegionId) {
198 debug!(
199 "Fails stalled requests for region {} as region not found",
200 region_id
201 );
202 let (requests, bulk) = self.stalled_requests.remove(region_id);
203 self.stalling_count
204 .sub((requests.len() + bulk.len()) as i64);
205
206 for req in requests {
207 req.sender.send(
208 RegionNotFoundSnafu {
209 region_id: req.request.region_id,
210 }
211 .fail(),
212 );
213 }
214 for req in bulk {
215 req.sender.send(
216 RegionNotFoundSnafu {
217 region_id: req.region_id,
218 }
219 .fail(),
220 );
221 }
222 }
223
224 pub(crate) async fn handle_region_stalled_requests(
231 &mut self,
232 region_id: &RegionId,
233 allow_stall: bool,
234 ) {
235 debug!("Handles stalled requests for region {}", region_id);
236 let (mut requests, mut bulk) = self.stalled_requests.remove(region_id);
237 self.stalling_count
238 .sub((requests.len() + bulk.len()) as i64);
239 self.handle_write_requests(&mut requests, &mut bulk, allow_stall)
240 .await;
241 }
242
243 pub(crate) async fn handle_buffered_region_write_requests(
249 &mut self,
250 region_id: &RegionId,
251 write_requests: &mut Vec<SenderWriteRequest>,
252 bulk_requests: &mut Vec<SenderBulkRequest>,
253 ) {
254 let mut current_region_write_requests = write_requests
255 .extract_if(.., |r| r.request.region_id == *region_id)
256 .collect::<Vec<_>>();
257
258 let mut current_region_bulk_requests = bulk_requests
259 .extract_if(.., |r| r.region_id == *region_id)
260 .collect::<Vec<_>>();
261
262 self.handle_write_requests(
263 &mut current_region_write_requests,
264 &mut current_region_bulk_requests,
265 true,
266 )
267 .await;
268 }
269}
270
271impl<S> RegionWorkerLoop<S> {
272 fn prepare_region_write_ctx(
274 &mut self,
275 write_requests: &mut Vec<SenderWriteRequest>,
276 bulk_requests: &mut Vec<SenderBulkRequest>,
277 ) -> HashMap<RegionId, RegionWriteCtx> {
278 let mut region_ctxs = HashMap::new();
280 self.process_write_requests(&mut region_ctxs, write_requests);
281 self.process_bulk_requests(&mut region_ctxs, bulk_requests);
282 region_ctxs
283 }
284
285 fn process_write_requests(
286 &mut self,
287 region_ctxs: &mut HashMap<RegionId, RegionWriteCtx>,
288 write_requests: &mut Vec<SenderWriteRequest>,
289 ) {
290 for mut sender_req in write_requests.drain(..) {
291 let region_id = sender_req.request.region_id;
292
293 if self.flush_scheduler.has_pending_ddls(region_id) {
295 self.flush_scheduler
298 .add_write_request_to_pending(sender_req);
299 continue;
300 }
301
302 if let hash_map::Entry::Vacant(e) = region_ctxs.entry(region_id) {
304 let Some(region) = self
305 .regions
306 .get_region_or(region_id, &mut sender_req.sender)
307 else {
308 continue;
310 };
311 #[cfg(test)]
312 debug!(
313 "Handling write request for region {}, state: {:?}",
314 region_id,
315 region.state()
316 );
317 match region.state() {
318 RegionRoleState::Leader(RegionLeaderState::Writable)
319 | RegionRoleState::Leader(RegionLeaderState::Staging) => {
320 if region.reject_all_writes_in_staging() {
321 sender_req
322 .sender
323 .send(RejectWriteSnafu { region_id }.fail());
324 continue;
325 }
326
327 let region_ctx = RegionWriteCtx::new(
328 region.region_id,
329 ®ion.version_control,
330 region.provider.clone(),
331 Some(region.written_bytes.clone()),
332 );
333
334 e.insert(region_ctx);
335 }
336 RegionRoleState::Leader(RegionLeaderState::Altering)
337 | RegionRoleState::Leader(RegionLeaderState::Editing) => {
338 debug!(
341 "Region {} is {:?}, add request to pending writes",
342 region.region_id,
343 region.state()
344 );
345 self.stalling_count.add(1);
346 WRITE_STALL_TOTAL.inc();
347 self.stalled_requests.push(sender_req);
348 continue;
349 }
350 RegionRoleState::Leader(RegionLeaderState::EnteringStaging) => {
351 debug!(
352 "Region {} is entering staging, add request to pending writes",
353 region.region_id
354 );
355 self.stalling_count.add(1);
356 WRITE_STALL_TOTAL.inc();
357 self.stalled_requests.push(sender_req);
358 continue;
359 }
360 state => {
361 sender_req.sender.send(
363 RegionStateSnafu {
364 region_id,
365 state,
366 expect: RegionRoleState::Leader(RegionLeaderState::Writable),
367 }
368 .fail(),
369 );
370 continue;
371 }
372 }
373 }
374
375 let region_ctx = region_ctxs.get_mut(®ion_id).unwrap();
377 let Some(region) = self
378 .regions
379 .get_region_or(region_id, &mut sender_req.sender)
380 else {
381 continue;
382 };
383 if region.reject_all_writes_in_staging() {
384 sender_req
385 .sender
386 .send(RejectWriteSnafu { region_id }.fail());
387 continue;
388 }
389 let expected_version = region.expected_partition_expr_version();
390 if let Err(e) = check_partition_expr_version(
391 region_id,
392 expected_version,
393 sender_req.request.partition_expr_version,
394 ) {
395 sender_req.sender.send(Err(e));
396 continue;
397 }
398
399 if let Err(e) = check_op_type(
400 region_ctx.version().options.append_mode,
401 &sender_req.request,
402 ) {
403 sender_req.sender.send(Err(e));
405
406 continue;
407 }
408
409 let need_fill_missing_columns =
411 if let Some(ref region_metadata) = sender_req.request.region_metadata {
412 region_ctx.version().metadata.schema_version != region_metadata.schema_version
413 } else {
414 true
415 };
416 if need_fill_missing_columns
418 && sender_req.request.primary_key_encoding() == PrimaryKeyEncoding::Dense
419 && let Err(e) = sender_req
420 .request
421 .maybe_fill_missing_columns(®ion_ctx.version().metadata)
422 {
423 sender_req.sender.send(Err(e));
424
425 continue;
426 }
427
428 region_ctx.push_mutation(
430 sender_req.request.op_type as i32,
431 Some(sender_req.request.rows),
432 sender_req.request.hint,
433 sender_req.sender,
434 None,
435 );
436 }
437 }
438
439 fn process_bulk_requests(
441 &mut self,
442 region_ctxs: &mut HashMap<RegionId, RegionWriteCtx>,
443 requests: &mut Vec<SenderBulkRequest>,
444 ) {
445 let _timer = metrics::REGION_WORKER_HANDLE_WRITE_ELAPSED
446 .with_label_values(&["prepare_bulk_request"])
447 .start_timer();
448 for mut bulk_req in requests.drain(..) {
449 let region_id = bulk_req.region_id;
450 if self.flush_scheduler.has_pending_ddls(region_id) {
452 self.flush_scheduler.add_bulk_request_to_pending(bulk_req);
454 continue;
455 }
456
457 if let hash_map::Entry::Vacant(e) = region_ctxs.entry(region_id) {
459 let Some(region) = self.regions.get_region_or(region_id, &mut bulk_req.sender)
460 else {
461 continue;
462 };
463 match region.state() {
464 RegionRoleState::Leader(RegionLeaderState::Writable)
465 | RegionRoleState::Leader(RegionLeaderState::Staging) => {
466 if region.reject_all_writes_in_staging() {
467 bulk_req.sender.send(RejectWriteSnafu { region_id }.fail());
468 continue;
469 }
470 let region_ctx = RegionWriteCtx::new(
471 region.region_id,
472 ®ion.version_control,
473 region.provider.clone(),
474 Some(region.written_bytes.clone()),
475 );
476
477 e.insert(region_ctx);
478 }
479 RegionRoleState::Leader(RegionLeaderState::Altering)
480 | RegionRoleState::Leader(RegionLeaderState::Editing) => {
481 debug!(
484 "Region {} is {:?}, add request to pending writes",
485 region.region_id,
486 region.state()
487 );
488 self.stalling_count.add(1);
489 WRITE_STALL_TOTAL.inc();
490 self.stalled_requests.push_bulk(bulk_req);
491 continue;
492 }
493 state => {
494 bulk_req.sender.send(
496 RegionStateSnafu {
497 region_id,
498 state,
499 expect: RegionRoleState::Leader(RegionLeaderState::Writable),
500 }
501 .fail(),
502 );
503 continue;
504 }
505 }
506 }
507
508 let region_ctx = region_ctxs.get_mut(®ion_id).unwrap();
510 let Some(region) = self.regions.get_region_or(region_id, &mut bulk_req.sender) else {
511 continue;
512 };
513 if region.reject_all_writes_in_staging() {
514 bulk_req.sender.send(RejectWriteSnafu { region_id }.fail());
515 continue;
516 }
517 let expected_version = region.expected_partition_expr_version();
518 if let Err(e) = check_partition_expr_version(
519 region_id,
520 expected_version,
521 bulk_req.partition_expr_version,
522 ) {
523 bulk_req.sender.send(Err(e));
524 continue;
525 }
526
527 let need_fill_missing_columns = region_ctx.version().metadata.schema_version
529 != bulk_req.region_metadata.schema_version;
530
531 if need_fill_missing_columns
533 && let Err(e) = bulk_req
534 .request
535 .fill_missing_columns(®ion_ctx.version().metadata)
536 {
537 bulk_req.sender.send(Err(e));
538 continue;
539 }
540
541 if !region_ctx.push_bulk(bulk_req.sender, bulk_req.request, None) {
543 return;
544 }
545 }
546 }
547
548 pub(crate) fn should_reject_write(&self) -> bool {
550 self.write_buffer_manager.memory_usage() + self.stalled_requests.estimated_size
552 >= self.config.global_write_buffer_reject_size.as_bytes() as usize
553 }
554}
555
556fn reject_write_requests(
558 write_requests: &mut Vec<SenderWriteRequest>,
559 bulk_requests: &mut Vec<SenderBulkRequest>,
560) {
561 WRITE_REJECT_TOTAL.inc_by(write_requests.len() as u64);
562
563 for req in write_requests.drain(..) {
564 req.sender.send(
565 RejectWriteSnafu {
566 region_id: req.request.region_id,
567 }
568 .fail(),
569 );
570 }
571 for req in bulk_requests.drain(..) {
572 let region_id = req.region_id;
573 req.sender.send(RejectWriteSnafu { region_id }.fail());
574 }
575}
576
577fn check_op_type(append_mode: bool, request: &WriteRequest) -> Result<()> {
579 if append_mode {
580 ensure!(
581 request.op_type == OpType::Put,
582 InvalidRequestSnafu {
583 region_id: request.region_id,
584 reason: "DELETE is not allowed under append mode",
585 }
586 );
587 }
588
589 Ok(())
590}
591
592fn check_partition_expr_version(
593 region_id: RegionId,
594 expected_version: u64,
595 request_version: Option<u64>,
596) -> Result<()> {
597 let request_version = match request_version {
598 None => return Ok(()),
599 Some(value) => value,
600 };
601 if request_version != expected_version {
602 return PartitionExprVersionMismatchSnafu {
603 region_id,
604 request_version,
605 expected_version,
606 }
607 .fail();
608 }
609 Ok(())
610}