1use std::collections::{HashMap, hash_map};
18use std::sync::Arc;
19
20use api::v1::OpType;
21use common_telemetry::{debug, error};
22use common_wal::options::WalOptions;
23use snafu::ensure;
24use store_api::codec::PrimaryKeyEncoding;
25use store_api::logstore::LogStore;
26use store_api::storage::RegionId;
27
28use crate::error::{
29 InvalidRequestSnafu, PartitionExprVersionMismatchSnafu, RegionStateSnafu, RejectWriteSnafu,
30 Result,
31};
32use crate::metrics;
33use crate::metrics::{
34 WRITE_REJECT_TOTAL, WRITE_ROWS_TOTAL, WRITE_STAGE_ELAPSED, WRITE_STALL_TOTAL,
35};
36use crate::region::{RegionLeaderState, RegionRoleState};
37use crate::region_write_ctx::RegionWriteCtx;
38use crate::request::{SenderBulkRequest, SenderWriteRequest, WriteRequest};
39use crate::worker::RegionWorkerLoop;
40
41impl<S: LogStore> RegionWorkerLoop<S> {
42 pub(crate) async fn handle_write_requests(
44 &mut self,
45 write_requests: &mut Vec<SenderWriteRequest>,
46 bulk_requests: &mut Vec<SenderBulkRequest>,
47 allow_stall: bool,
48 ) {
49 if write_requests.is_empty() && bulk_requests.is_empty() {
50 return;
51 }
52
53 self.maybe_flush_worker();
55
56 if self.should_reject_write() {
57 reject_write_requests(write_requests, bulk_requests);
59 self.reject_stalled_requests();
61 return;
62 }
63
64 if self.write_buffer_manager.should_stall() && allow_stall {
65 let stalled_count = (write_requests.len() + bulk_requests.len()) as i64;
66 self.stalling_count.add(stalled_count);
67 WRITE_STALL_TOTAL.inc_by(stalled_count as u64);
68 self.stalled_requests.append(write_requests, bulk_requests);
69 self.listener.on_write_stall();
70 return;
71 }
72
73 let mut region_ctxs = {
75 let _timer = WRITE_STAGE_ELAPSED
76 .with_label_values(&["prepare_ctx"])
77 .start_timer();
78 self.prepare_region_write_ctx(write_requests, bulk_requests)
79 };
80
81 {
83 let _timer = WRITE_STAGE_ELAPSED
84 .with_label_values(&["write_wal"])
85 .start_timer();
86 let mut wal_writer = self.wal.writer();
87 for region_ctx in region_ctxs.values_mut() {
88 if let WalOptions::Noop = ®ion_ctx.version().options.wal_options {
89 continue;
91 }
92 if let Err(e) = region_ctx.add_wal_entry(&mut wal_writer).map_err(Arc::new) {
93 region_ctx.set_error(e);
94 }
95 }
96 match wal_writer.write_to_wal().await.map_err(Arc::new) {
97 Ok(response) => {
98 for (region_id, region_ctx) in region_ctxs.iter_mut() {
99 if let WalOptions::Noop = ®ion_ctx.version().options.wal_options {
100 continue;
101 }
102
103 let last_entry_id = response.last_entry_ids.get(region_id).unwrap();
106 region_ctx.set_next_entry_id(last_entry_id + 1);
107 }
108 }
109 Err(e) => {
110 for mut region_ctx in region_ctxs.into_values() {
112 region_ctx.set_error(e.clone());
113 }
114 return;
115 }
116 }
117 }
118
119 let (mut put_rows, mut delete_rows) = (0, 0);
120 {
122 let _timer = WRITE_STAGE_ELAPSED
123 .with_label_values(&["write_memtable"])
124 .start_timer();
125 if region_ctxs.len() == 1 {
126 let mut region_ctx = region_ctxs.into_values().next().unwrap();
128 region_ctx.write_memtable().await;
129 region_ctx.write_bulk().await;
130 put_rows += region_ctx.put_num;
131 delete_rows += region_ctx.delete_num;
132 } else {
133 let region_write_task = region_ctxs
134 .into_values()
135 .map(|mut region_ctx| {
136 common_runtime::spawn_global(async move {
138 region_ctx.write_memtable().await;
139 region_ctx.write_bulk().await;
140 (region_ctx.put_num, region_ctx.delete_num)
141 })
142 })
143 .collect::<Vec<_>>();
144
145 for result in futures::future::join_all(region_write_task).await {
146 match result {
147 Ok((put, delete)) => {
148 put_rows += put;
149 delete_rows += delete;
150 }
151 Err(e) => {
152 error!(e; "unexpected error when joining region write tasks");
153 }
154 }
155 }
156 }
157 }
158 WRITE_ROWS_TOTAL
159 .with_label_values(&["put"])
160 .inc_by(put_rows as u64);
161 WRITE_ROWS_TOTAL
162 .with_label_values(&["delete"])
163 .inc_by(delete_rows as u64);
164 }
165
166 pub(crate) async fn handle_stalled_requests(&mut self) {
168 let stalled = std::mem::take(&mut self.stalled_requests);
170 self.stalling_count.sub(stalled.stalled_count() as i64);
171 for (_, (_, mut requests, mut bulk)) in stalled.requests {
173 self.handle_write_requests(&mut requests, &mut bulk, false)
174 .await;
175 }
176 }
177
178 pub(crate) fn reject_stalled_requests(&mut self) {
180 let stalled = std::mem::take(&mut self.stalled_requests);
181 self.stalling_count.sub(stalled.stalled_count() as i64);
182 for (_, (_, mut requests, mut bulk)) in stalled.requests {
183 reject_write_requests(&mut requests, &mut bulk);
184 }
185 }
186
187 pub(crate) fn reject_region_stalled_requests(&mut self, region_id: &RegionId) {
189 debug!("Rejects stalled requests for region {}", region_id);
190 let (mut requests, mut bulk) = self.stalled_requests.remove(region_id);
191 self.stalling_count
192 .sub((requests.len() + bulk.len()) as i64);
193 reject_write_requests(&mut requests, &mut bulk);
194 }
195
196 pub(crate) async fn handle_region_stalled_requests(&mut self, region_id: &RegionId) {
198 debug!("Handles stalled requests for region {}", region_id);
199 let (mut requests, mut bulk) = self.stalled_requests.remove(region_id);
200 self.stalling_count
201 .sub((requests.len() + bulk.len()) as i64);
202 self.handle_write_requests(&mut requests, &mut bulk, true)
203 .await;
204 }
205}
206
207impl<S> RegionWorkerLoop<S> {
208 fn prepare_region_write_ctx(
210 &mut self,
211 write_requests: &mut Vec<SenderWriteRequest>,
212 bulk_requests: &mut Vec<SenderBulkRequest>,
213 ) -> HashMap<RegionId, RegionWriteCtx> {
214 let mut region_ctxs = HashMap::new();
216 self.process_write_requests(&mut region_ctxs, write_requests);
217 self.process_bulk_requests(&mut region_ctxs, bulk_requests);
218 region_ctxs
219 }
220
221 fn process_write_requests(
222 &mut self,
223 region_ctxs: &mut HashMap<RegionId, RegionWriteCtx>,
224 write_requests: &mut Vec<SenderWriteRequest>,
225 ) {
226 for mut sender_req in write_requests.drain(..) {
227 let region_id = sender_req.request.region_id;
228
229 if self.flush_scheduler.has_pending_ddls(region_id) {
231 self.flush_scheduler
234 .add_write_request_to_pending(sender_req);
235 continue;
236 }
237
238 if let hash_map::Entry::Vacant(e) = region_ctxs.entry(region_id) {
240 let Some(region) = self
241 .regions
242 .get_region_or(region_id, &mut sender_req.sender)
243 else {
244 continue;
246 };
247 #[cfg(test)]
248 debug!(
249 "Handling write request for region {}, state: {:?}",
250 region_id,
251 region.state()
252 );
253 match region.state() {
254 RegionRoleState::Leader(RegionLeaderState::Writable)
255 | RegionRoleState::Leader(RegionLeaderState::Staging) => {
256 if region.reject_all_writes_in_staging() {
257 sender_req
258 .sender
259 .send(RejectWriteSnafu { region_id }.fail());
260 continue;
261 }
262
263 let region_ctx = RegionWriteCtx::new(
264 region.region_id,
265 ®ion.version_control,
266 region.provider.clone(),
267 Some(region.written_bytes.clone()),
268 );
269
270 e.insert(region_ctx);
271 }
272 RegionRoleState::Leader(RegionLeaderState::Altering) => {
273 debug!(
274 "Region {} is altering, add request to pending writes",
275 region.region_id
276 );
277 self.stalling_count.add(1);
278 WRITE_STALL_TOTAL.inc();
279 self.stalled_requests.push(sender_req);
280 continue;
281 }
282 RegionRoleState::Leader(RegionLeaderState::EnteringStaging) => {
283 debug!(
284 "Region {} is entering staging, add request to pending writes",
285 region.region_id
286 );
287 self.stalling_count.add(1);
288 WRITE_STALL_TOTAL.inc();
289 self.stalled_requests.push(sender_req);
290 continue;
291 }
292 state => {
293 sender_req.sender.send(
295 RegionStateSnafu {
296 region_id,
297 state,
298 expect: RegionRoleState::Leader(RegionLeaderState::Writable),
299 }
300 .fail(),
301 );
302 continue;
303 }
304 }
305 }
306
307 let region_ctx = region_ctxs.get_mut(®ion_id).unwrap();
309 let Some(region) = self
310 .regions
311 .get_region_or(region_id, &mut sender_req.sender)
312 else {
313 continue;
314 };
315 if region.reject_all_writes_in_staging() {
316 sender_req
317 .sender
318 .send(RejectWriteSnafu { region_id }.fail());
319 continue;
320 }
321 let expected_version = region.expected_partition_expr_version();
322 if let Err(e) = check_partition_expr_version(
323 region_id,
324 expected_version,
325 sender_req.request.partition_expr_version,
326 ) {
327 sender_req.sender.send(Err(e));
328 continue;
329 }
330
331 if let Err(e) = check_op_type(
332 region_ctx.version().options.append_mode,
333 &sender_req.request,
334 ) {
335 sender_req.sender.send(Err(e));
337
338 continue;
339 }
340
341 let need_fill_missing_columns =
343 if let Some(ref region_metadata) = sender_req.request.region_metadata {
344 region_ctx.version().metadata.schema_version != region_metadata.schema_version
345 } else {
346 true
347 };
348 if need_fill_missing_columns
350 && sender_req.request.primary_key_encoding() == PrimaryKeyEncoding::Dense
351 && let Err(e) = sender_req
352 .request
353 .maybe_fill_missing_columns(®ion_ctx.version().metadata)
354 {
355 sender_req.sender.send(Err(e));
356
357 continue;
358 }
359
360 region_ctx.push_mutation(
362 sender_req.request.op_type as i32,
363 Some(sender_req.request.rows),
364 sender_req.request.hint,
365 sender_req.sender,
366 None,
367 );
368 }
369 }
370
371 fn process_bulk_requests(
373 &mut self,
374 region_ctxs: &mut HashMap<RegionId, RegionWriteCtx>,
375 requests: &mut Vec<SenderBulkRequest>,
376 ) {
377 let _timer = metrics::REGION_WORKER_HANDLE_WRITE_ELAPSED
378 .with_label_values(&["prepare_bulk_request"])
379 .start_timer();
380 for mut bulk_req in requests.drain(..) {
381 let region_id = bulk_req.region_id;
382 if self.flush_scheduler.has_pending_ddls(region_id) {
384 self.flush_scheduler.add_bulk_request_to_pending(bulk_req);
386 continue;
387 }
388
389 if let hash_map::Entry::Vacant(e) = region_ctxs.entry(region_id) {
391 let Some(region) = self.regions.get_region_or(region_id, &mut bulk_req.sender)
392 else {
393 continue;
394 };
395 match region.state() {
396 RegionRoleState::Leader(RegionLeaderState::Writable)
397 | RegionRoleState::Leader(RegionLeaderState::Staging) => {
398 if region.reject_all_writes_in_staging() {
399 bulk_req.sender.send(RejectWriteSnafu { region_id }.fail());
400 continue;
401 }
402 let region_ctx = RegionWriteCtx::new(
403 region.region_id,
404 ®ion.version_control,
405 region.provider.clone(),
406 Some(region.written_bytes.clone()),
407 );
408
409 e.insert(region_ctx);
410 }
411 RegionRoleState::Leader(RegionLeaderState::Altering) => {
412 debug!(
413 "Region {} is altering, add request to pending writes",
414 region.region_id
415 );
416 self.stalling_count.add(1);
417 WRITE_STALL_TOTAL.inc();
418 self.stalled_requests.push_bulk(bulk_req);
419 continue;
420 }
421 state => {
422 bulk_req.sender.send(
424 RegionStateSnafu {
425 region_id,
426 state,
427 expect: RegionRoleState::Leader(RegionLeaderState::Writable),
428 }
429 .fail(),
430 );
431 continue;
432 }
433 }
434 }
435
436 let region_ctx = region_ctxs.get_mut(®ion_id).unwrap();
438 let Some(region) = self.regions.get_region_or(region_id, &mut bulk_req.sender) else {
439 continue;
440 };
441 if region.reject_all_writes_in_staging() {
442 bulk_req.sender.send(RejectWriteSnafu { region_id }.fail());
443 continue;
444 }
445 let expected_version = region.expected_partition_expr_version();
446 if let Err(e) = check_partition_expr_version(
447 region_id,
448 expected_version,
449 bulk_req.partition_expr_version,
450 ) {
451 bulk_req.sender.send(Err(e));
452 continue;
453 }
454
455 let need_fill_missing_columns = region_ctx.version().metadata.schema_version
457 != bulk_req.region_metadata.schema_version;
458
459 if need_fill_missing_columns
461 && let Err(e) = bulk_req
462 .request
463 .fill_missing_columns(®ion_ctx.version().metadata)
464 {
465 bulk_req.sender.send(Err(e));
466 continue;
467 }
468
469 if !region_ctx.push_bulk(bulk_req.sender, bulk_req.request, None) {
471 return;
472 }
473 }
474 }
475
476 pub(crate) fn should_reject_write(&self) -> bool {
478 self.write_buffer_manager.memory_usage() + self.stalled_requests.estimated_size
480 >= self.config.global_write_buffer_reject_size.as_bytes() as usize
481 }
482}
483
484fn reject_write_requests(
486 write_requests: &mut Vec<SenderWriteRequest>,
487 bulk_requests: &mut Vec<SenderBulkRequest>,
488) {
489 WRITE_REJECT_TOTAL.inc_by(write_requests.len() as u64);
490
491 for req in write_requests.drain(..) {
492 req.sender.send(
493 RejectWriteSnafu {
494 region_id: req.request.region_id,
495 }
496 .fail(),
497 );
498 }
499 for req in bulk_requests.drain(..) {
500 let region_id = req.region_id;
501 req.sender.send(RejectWriteSnafu { region_id }.fail());
502 }
503}
504
505fn check_op_type(append_mode: bool, request: &WriteRequest) -> Result<()> {
507 if append_mode {
508 ensure!(
509 request.op_type == OpType::Put,
510 InvalidRequestSnafu {
511 region_id: request.region_id,
512 reason: "DELETE is not allowed under append mode",
513 }
514 );
515 }
516
517 Ok(())
518}
519
520fn check_partition_expr_version(
521 region_id: RegionId,
522 expected_version: u64,
523 request_version: Option<u64>,
524) -> Result<()> {
525 let request_version = match request_version {
526 None => return Ok(()),
527 Some(value) => value,
528 };
529 if request_version != expected_version {
530 return PartitionExprVersionMismatchSnafu {
531 region_id,
532 request_version,
533 expected_version,
534 }
535 .fail();
536 }
537 Ok(())
538}