1use std::collections::{hash_map, HashMap};
18use std::sync::Arc;
19
20use api::v1::OpType;
21use common_telemetry::{debug, error};
22use common_wal::options::WalOptions;
23use snafu::ensure;
24use store_api::codec::PrimaryKeyEncoding;
25use store_api::logstore::LogStore;
26use store_api::storage::RegionId;
27
28use crate::error::{InvalidRequestSnafu, RegionStateSnafu, RejectWriteSnafu, Result};
29use crate::metrics;
30use crate::metrics::{
31 WRITE_REJECT_TOTAL, WRITE_ROWS_TOTAL, WRITE_STAGE_ELAPSED, WRITE_STALL_TOTAL,
32};
33use crate::region::{RegionLeaderState, RegionRoleState};
34use crate::region_write_ctx::RegionWriteCtx;
35use crate::request::{SenderBulkRequest, SenderWriteRequest, WriteRequest};
36use crate::worker::RegionWorkerLoop;
37
38impl<S: LogStore> RegionWorkerLoop<S> {
39 pub(crate) async fn handle_write_requests(
41 &mut self,
42 write_requests: &mut Vec<SenderWriteRequest>,
43 bulk_requests: &mut Vec<SenderBulkRequest>,
44 allow_stall: bool,
45 ) {
46 if write_requests.is_empty() && bulk_requests.is_empty() {
47 return;
48 }
49
50 self.maybe_flush_worker();
52
53 if self.should_reject_write() {
54 reject_write_requests(write_requests, bulk_requests);
56 self.reject_stalled_requests();
58 return;
59 }
60
61 if self.write_buffer_manager.should_stall() && allow_stall {
62 let stalled_count = (write_requests.len() + bulk_requests.len()) as i64;
63 self.stalling_count.add(stalled_count);
64 WRITE_STALL_TOTAL.inc_by(stalled_count as u64);
65 self.stalled_requests.append(write_requests, bulk_requests);
66 self.listener.on_write_stall();
67 return;
68 }
69
70 let mut region_ctxs = {
72 let _timer = WRITE_STAGE_ELAPSED
73 .with_label_values(&["prepare_ctx"])
74 .start_timer();
75 self.prepare_region_write_ctx(write_requests, bulk_requests)
76 };
77
78 {
80 let _timer = WRITE_STAGE_ELAPSED
81 .with_label_values(&["write_wal"])
82 .start_timer();
83 let mut wal_writer = self.wal.writer();
84 for region_ctx in region_ctxs.values_mut() {
85 if let WalOptions::Noop = ®ion_ctx.version().options.wal_options {
86 continue;
88 }
89 if let Err(e) = region_ctx.add_wal_entry(&mut wal_writer).map_err(Arc::new) {
90 region_ctx.set_error(e);
91 }
92 }
93 match wal_writer.write_to_wal().await.map_err(Arc::new) {
94 Ok(response) => {
95 for (region_id, region_ctx) in region_ctxs.iter_mut() {
96 if let WalOptions::Noop = ®ion_ctx.version().options.wal_options {
97 continue;
98 }
99
100 let last_entry_id = response.last_entry_ids.get(region_id).unwrap();
103 region_ctx.set_next_entry_id(last_entry_id + 1);
104 }
105 }
106 Err(e) => {
107 for mut region_ctx in region_ctxs.into_values() {
109 region_ctx.set_error(e.clone());
110 }
111 return;
112 }
113 }
114 }
115
116 let (mut put_rows, mut delete_rows) = (0, 0);
117 {
119 let _timer = WRITE_STAGE_ELAPSED
120 .with_label_values(&["write_memtable"])
121 .start_timer();
122 if region_ctxs.len() == 1 {
123 let mut region_ctx = region_ctxs.into_values().next().unwrap();
125 region_ctx.write_memtable().await;
126 region_ctx.write_bulk().await;
127 put_rows += region_ctx.put_num;
128 delete_rows += region_ctx.delete_num;
129 } else {
130 let region_write_task = region_ctxs
131 .into_values()
132 .map(|mut region_ctx| {
133 common_runtime::spawn_global(async move {
135 region_ctx.write_memtable().await;
136 region_ctx.write_bulk().await;
137 (region_ctx.put_num, region_ctx.delete_num)
138 })
139 })
140 .collect::<Vec<_>>();
141
142 for result in futures::future::join_all(region_write_task).await {
143 match result {
144 Ok((put, delete)) => {
145 put_rows += put;
146 delete_rows += delete;
147 }
148 Err(e) => {
149 error!(e; "unexpected error when joining region write tasks");
150 }
151 }
152 }
153 }
154 }
155 WRITE_ROWS_TOTAL
156 .with_label_values(&["put"])
157 .inc_by(put_rows as u64);
158 WRITE_ROWS_TOTAL
159 .with_label_values(&["delete"])
160 .inc_by(delete_rows as u64);
161 }
162
163 pub(crate) async fn handle_stalled_requests(&mut self) {
165 let stalled = std::mem::take(&mut self.stalled_requests);
167 self.stalling_count.sub(stalled.stalled_count() as i64);
168 for (_, (_, mut requests, mut bulk)) in stalled.requests {
170 self.handle_write_requests(&mut requests, &mut bulk, false)
171 .await;
172 }
173 }
174
175 pub(crate) fn reject_stalled_requests(&mut self) {
177 let stalled = std::mem::take(&mut self.stalled_requests);
178 self.stalling_count.sub(stalled.stalled_count() as i64);
179 for (_, (_, mut requests, mut bulk)) in stalled.requests {
180 reject_write_requests(&mut requests, &mut bulk);
181 }
182 }
183
184 pub(crate) fn reject_region_stalled_requests(&mut self, region_id: &RegionId) {
186 debug!("Rejects stalled requests for region {}", region_id);
187 let (mut requests, mut bulk) = self.stalled_requests.remove(region_id);
188 self.stalling_count
189 .sub((requests.len() + bulk.len()) as i64);
190 reject_write_requests(&mut requests, &mut bulk);
191 }
192
193 pub(crate) async fn handle_region_stalled_requests(&mut self, region_id: &RegionId) {
195 debug!("Handles stalled requests for region {}", region_id);
196 let (mut requests, mut bulk) = self.stalled_requests.remove(region_id);
197 self.stalling_count
198 .sub((requests.len() + bulk.len()) as i64);
199 self.handle_write_requests(&mut requests, &mut bulk, true)
200 .await;
201 }
202}
203
204impl<S> RegionWorkerLoop<S> {
205 fn prepare_region_write_ctx(
207 &mut self,
208 write_requests: &mut Vec<SenderWriteRequest>,
209 bulk_requests: &mut Vec<SenderBulkRequest>,
210 ) -> HashMap<RegionId, RegionWriteCtx> {
211 let mut region_ctxs = HashMap::new();
213 self.process_write_requests(&mut region_ctxs, write_requests);
214 self.process_bulk_requests(&mut region_ctxs, bulk_requests);
215 region_ctxs
216 }
217
218 fn process_write_requests(
219 &mut self,
220 region_ctxs: &mut HashMap<RegionId, RegionWriteCtx>,
221 write_requests: &mut Vec<SenderWriteRequest>,
222 ) {
223 for mut sender_req in write_requests.drain(..) {
224 let region_id = sender_req.request.region_id;
225
226 if self.flush_scheduler.has_pending_ddls(region_id) {
228 self.flush_scheduler
231 .add_write_request_to_pending(sender_req);
232 continue;
233 }
234
235 if let hash_map::Entry::Vacant(e) = region_ctxs.entry(region_id) {
237 let Some(region) = self
238 .regions
239 .get_region_or(region_id, &mut sender_req.sender)
240 else {
241 continue;
243 };
244 match region.state() {
245 RegionRoleState::Leader(RegionLeaderState::Writable) => {
246 let region_ctx = RegionWriteCtx::new(
247 region.region_id,
248 ®ion.version_control,
249 region.provider.clone(),
250 );
251
252 e.insert(region_ctx);
253 }
254 RegionRoleState::Leader(RegionLeaderState::Altering) => {
255 debug!(
256 "Region {} is altering, add request to pending writes",
257 region.region_id
258 );
259 self.stalling_count.add(1);
260 WRITE_STALL_TOTAL.inc();
261 self.stalled_requests.push(sender_req);
262 continue;
263 }
264 state => {
265 sender_req.sender.send(
267 RegionStateSnafu {
268 region_id,
269 state,
270 expect: RegionRoleState::Leader(RegionLeaderState::Writable),
271 }
272 .fail(),
273 );
274 continue;
275 }
276 }
277 }
278
279 let region_ctx = region_ctxs.get_mut(®ion_id).unwrap();
281
282 if let Err(e) = check_op_type(
283 region_ctx.version().options.append_mode,
284 &sender_req.request,
285 ) {
286 sender_req.sender.send(Err(e));
288
289 continue;
290 }
291
292 let need_fill_missing_columns =
294 if let Some(ref region_metadata) = sender_req.request.region_metadata {
295 region_ctx.version().metadata.schema_version != region_metadata.schema_version
296 } else {
297 true
298 };
299 if need_fill_missing_columns
301 && sender_req.request.primary_key_encoding() == PrimaryKeyEncoding::Dense
302 {
303 if let Err(e) = sender_req
304 .request
305 .maybe_fill_missing_columns(®ion_ctx.version().metadata)
306 {
307 sender_req.sender.send(Err(e));
308
309 continue;
310 }
311 }
312
313 region_ctx.push_mutation(
315 sender_req.request.op_type as i32,
316 Some(sender_req.request.rows),
317 sender_req.request.hint,
318 sender_req.sender,
319 );
320 }
321 }
322
323 fn process_bulk_requests(
325 &mut self,
326 region_ctxs: &mut HashMap<RegionId, RegionWriteCtx>,
327 requests: &mut Vec<SenderBulkRequest>,
328 ) {
329 let _timer = metrics::REGION_WORKER_HANDLE_WRITE_ELAPSED
330 .with_label_values(&["prepare_bulk_request"])
331 .start_timer();
332 for mut bulk_req in requests.drain(..) {
333 let region_id = bulk_req.region_id;
334 if self.flush_scheduler.has_pending_ddls(region_id) {
336 self.flush_scheduler.add_bulk_request_to_pending(bulk_req);
338 continue;
339 }
340
341 if let hash_map::Entry::Vacant(e) = region_ctxs.entry(region_id) {
343 let Some(region) = self.regions.get_region_or(region_id, &mut bulk_req.sender)
344 else {
345 continue;
346 };
347 match region.state() {
348 RegionRoleState::Leader(RegionLeaderState::Writable) => {
349 let region_ctx = RegionWriteCtx::new(
350 region.region_id,
351 ®ion.version_control,
352 region.provider.clone(),
353 );
354
355 e.insert(region_ctx);
356 }
357 RegionRoleState::Leader(RegionLeaderState::Altering) => {
358 debug!(
359 "Region {} is altering, add request to pending writes",
360 region.region_id
361 );
362 self.stalling_count.add(1);
363 WRITE_STALL_TOTAL.inc();
364 self.stalled_requests.push_bulk(bulk_req);
365 continue;
366 }
367 state => {
368 bulk_req.sender.send(
370 RegionStateSnafu {
371 region_id,
372 state,
373 expect: RegionRoleState::Leader(RegionLeaderState::Writable),
374 }
375 .fail(),
376 );
377 continue;
378 }
379 }
380 }
381
382 let region_ctx = region_ctxs.get_mut(®ion_id).unwrap();
384
385 let need_fill_missing_columns = region_ctx.version().metadata.schema_version
387 != bulk_req.region_metadata.schema_version;
388
389 if need_fill_missing_columns {
391 bulk_req.sender.send(
393 InvalidRequestSnafu {
394 region_id,
395 reason: "Schema mismatch",
396 }
397 .fail(),
398 );
399 return;
400 }
401
402 if !region_ctx.push_bulk(bulk_req.sender, bulk_req.request) {
404 return;
405 }
406 }
407 }
408
409 pub(crate) fn should_reject_write(&self) -> bool {
411 self.write_buffer_manager.memory_usage() + self.stalled_requests.estimated_size
413 >= self.config.global_write_buffer_reject_size.as_bytes() as usize
414 }
415}
416
417fn reject_write_requests(
419 write_requests: &mut Vec<SenderWriteRequest>,
420 bulk_requests: &mut Vec<SenderBulkRequest>,
421) {
422 WRITE_REJECT_TOTAL.inc_by(write_requests.len() as u64);
423
424 for req in write_requests.drain(..) {
425 req.sender.send(
426 RejectWriteSnafu {
427 region_id: req.request.region_id,
428 }
429 .fail(),
430 );
431 }
432 for req in bulk_requests.drain(..) {
433 let region_id = req.region_id;
434 req.sender.send(RejectWriteSnafu { region_id }.fail());
435 }
436}
437
438fn check_op_type(append_mode: bool, request: &WriteRequest) -> Result<()> {
440 if append_mode {
441 ensure!(
442 request.op_type == OpType::Put,
443 InvalidRequestSnafu {
444 region_id: request.region_id,
445 reason: "DELETE is not allowed under append mode",
446 }
447 );
448 }
449
450 Ok(())
451}