1use std::collections::{hash_map, HashMap};
18use std::sync::Arc;
19
20use api::v1::OpType;
21use common_telemetry::{debug, error};
22use common_wal::options::WalOptions;
23use snafu::ensure;
24use store_api::codec::PrimaryKeyEncoding;
25use store_api::logstore::LogStore;
26use store_api::storage::RegionId;
27
28use crate::error::{InvalidRequestSnafu, RegionStateSnafu, RejectWriteSnafu, Result};
29use crate::metrics;
30use crate::metrics::{
31 WRITE_REJECT_TOTAL, WRITE_ROWS_TOTAL, WRITE_STAGE_ELAPSED, WRITE_STALL_TOTAL,
32};
33use crate::region::{RegionLeaderState, RegionRoleState};
34use crate::region_write_ctx::RegionWriteCtx;
35use crate::request::{SenderBulkRequest, SenderWriteRequest, WriteRequest};
36use crate::worker::RegionWorkerLoop;
37
38impl<S: LogStore> RegionWorkerLoop<S> {
39 pub(crate) async fn handle_write_requests(
41 &mut self,
42 write_requests: &mut Vec<SenderWriteRequest>,
43 bulk_requests: &mut Vec<SenderBulkRequest>,
44 allow_stall: bool,
45 ) {
46 if write_requests.is_empty() && bulk_requests.is_empty() {
47 return;
48 }
49
50 self.maybe_flush_worker();
52
53 if self.should_reject_write() {
54 reject_write_requests(write_requests, bulk_requests);
56 self.reject_stalled_requests();
58 return;
59 }
60
61 if self.write_buffer_manager.should_stall() && allow_stall {
62 let stalled_count = (write_requests.len() + bulk_requests.len()) as i64;
63 self.stalling_count.add(stalled_count);
64 WRITE_STALL_TOTAL.inc_by(stalled_count as u64);
65 self.stalled_requests.append(write_requests, bulk_requests);
66 self.listener.on_write_stall();
67 return;
68 }
69
70 let mut region_ctxs = {
72 let _timer = WRITE_STAGE_ELAPSED
73 .with_label_values(&["prepare_ctx"])
74 .start_timer();
75 self.prepare_region_write_ctx(write_requests, bulk_requests)
76 };
77
78 {
80 let _timer = WRITE_STAGE_ELAPSED
81 .with_label_values(&["write_wal"])
82 .start_timer();
83 let mut wal_writer = self.wal.writer();
84 for region_ctx in region_ctxs.values_mut() {
85 if let WalOptions::Noop = ®ion_ctx.version().options.wal_options {
86 continue;
88 }
89 if let Err(e) = region_ctx.add_wal_entry(&mut wal_writer).map_err(Arc::new) {
90 region_ctx.set_error(e);
91 }
92 }
93 match wal_writer.write_to_wal().await.map_err(Arc::new) {
94 Ok(response) => {
95 for (region_id, region_ctx) in region_ctxs.iter_mut() {
96 if let WalOptions::Noop = ®ion_ctx.version().options.wal_options {
97 continue;
98 }
99
100 let last_entry_id = response.last_entry_ids.get(region_id).unwrap();
103 region_ctx.set_next_entry_id(last_entry_id + 1);
104 }
105 }
106 Err(e) => {
107 for mut region_ctx in region_ctxs.into_values() {
109 region_ctx.set_error(e.clone());
110 }
111 return;
112 }
113 }
114 }
115
116 let (mut put_rows, mut delete_rows) = (0, 0);
117 {
119 let _timer = WRITE_STAGE_ELAPSED
120 .with_label_values(&["write_memtable"])
121 .start_timer();
122 if region_ctxs.len() == 1 {
123 let mut region_ctx = region_ctxs.into_values().next().unwrap();
125 region_ctx.write_memtable().await;
126 region_ctx.write_bulk().await;
127 put_rows += region_ctx.put_num;
128 delete_rows += region_ctx.delete_num;
129 } else {
130 let region_write_task = region_ctxs
131 .into_values()
132 .map(|mut region_ctx| {
133 common_runtime::spawn_global(async move {
135 region_ctx.write_memtable().await;
136 region_ctx.write_bulk().await;
137 (region_ctx.put_num, region_ctx.delete_num)
138 })
139 })
140 .collect::<Vec<_>>();
141
142 for result in futures::future::join_all(region_write_task).await {
143 match result {
144 Ok((put, delete)) => {
145 put_rows += put;
146 delete_rows += delete;
147 }
148 Err(e) => {
149 error!(e; "unexpected error when joining region write tasks");
150 }
151 }
152 }
153 }
154 }
155 WRITE_ROWS_TOTAL
156 .with_label_values(&["put"])
157 .inc_by(put_rows as u64);
158 WRITE_ROWS_TOTAL
159 .with_label_values(&["delete"])
160 .inc_by(delete_rows as u64);
161 }
162
163 pub(crate) async fn handle_stalled_requests(&mut self) {
165 let stalled = std::mem::take(&mut self.stalled_requests);
167 self.stalling_count.sub(stalled.stalled_count() as i64);
168 for (_, (_, mut requests, mut bulk)) in stalled.requests {
170 self.handle_write_requests(&mut requests, &mut bulk, false)
171 .await;
172 }
173 }
174
175 pub(crate) fn reject_stalled_requests(&mut self) {
177 let stalled = std::mem::take(&mut self.stalled_requests);
178 self.stalling_count.sub(stalled.stalled_count() as i64);
179 for (_, (_, mut requests, mut bulk)) in stalled.requests {
180 reject_write_requests(&mut requests, &mut bulk);
181 }
182 }
183
184 pub(crate) fn reject_region_stalled_requests(&mut self, region_id: &RegionId) {
186 debug!("Rejects stalled requests for region {}", region_id);
187 let (mut requests, mut bulk) = self.stalled_requests.remove(region_id);
188 self.stalling_count
189 .sub((requests.len() + bulk.len()) as i64);
190 reject_write_requests(&mut requests, &mut bulk);
191 }
192
193 pub(crate) async fn handle_region_stalled_requests(&mut self, region_id: &RegionId) {
195 debug!("Handles stalled requests for region {}", region_id);
196 let (mut requests, mut bulk) = self.stalled_requests.remove(region_id);
197 self.stalling_count
198 .sub((requests.len() + bulk.len()) as i64);
199 self.handle_write_requests(&mut requests, &mut bulk, true)
200 .await;
201 }
202}
203
204impl<S> RegionWorkerLoop<S> {
205 fn prepare_region_write_ctx(
207 &mut self,
208 write_requests: &mut Vec<SenderWriteRequest>,
209 bulk_requests: &mut Vec<SenderBulkRequest>,
210 ) -> HashMap<RegionId, RegionWriteCtx> {
211 let mut region_ctxs = HashMap::new();
213 self.process_write_requests(&mut region_ctxs, write_requests);
214 self.process_bulk_requests(&mut region_ctxs, bulk_requests);
215 region_ctxs
216 }
217
218 fn process_write_requests(
219 &mut self,
220 region_ctxs: &mut HashMap<RegionId, RegionWriteCtx>,
221 write_requests: &mut Vec<SenderWriteRequest>,
222 ) {
223 for mut sender_req in write_requests.drain(..) {
224 let region_id = sender_req.request.region_id;
225
226 if self.flush_scheduler.has_pending_ddls(region_id) {
228 self.flush_scheduler
231 .add_write_request_to_pending(sender_req);
232 continue;
233 }
234
235 if let hash_map::Entry::Vacant(e) = region_ctxs.entry(region_id) {
237 let Some(region) = self
238 .regions
239 .get_region_or(region_id, &mut sender_req.sender)
240 else {
241 continue;
243 };
244 match region.state() {
245 RegionRoleState::Leader(RegionLeaderState::Writable)
246 | RegionRoleState::Leader(RegionLeaderState::Staging) => {
247 let region_ctx = RegionWriteCtx::new(
248 region.region_id,
249 ®ion.version_control,
250 region.provider.clone(),
251 Some(region.written_bytes.clone()),
252 );
253
254 e.insert(region_ctx);
255 }
256 RegionRoleState::Leader(RegionLeaderState::Altering) => {
257 debug!(
258 "Region {} is altering, add request to pending writes",
259 region.region_id
260 );
261 self.stalling_count.add(1);
262 WRITE_STALL_TOTAL.inc();
263 self.stalled_requests.push(sender_req);
264 continue;
265 }
266 state => {
267 sender_req.sender.send(
269 RegionStateSnafu {
270 region_id,
271 state,
272 expect: RegionRoleState::Leader(RegionLeaderState::Writable),
273 }
274 .fail(),
275 );
276 continue;
277 }
278 }
279 }
280
281 let region_ctx = region_ctxs.get_mut(®ion_id).unwrap();
283
284 if let Err(e) = check_op_type(
285 region_ctx.version().options.append_mode,
286 &sender_req.request,
287 ) {
288 sender_req.sender.send(Err(e));
290
291 continue;
292 }
293
294 let need_fill_missing_columns =
296 if let Some(ref region_metadata) = sender_req.request.region_metadata {
297 region_ctx.version().metadata.schema_version != region_metadata.schema_version
298 } else {
299 true
300 };
301 if need_fill_missing_columns
303 && sender_req.request.primary_key_encoding() == PrimaryKeyEncoding::Dense
304 {
305 if let Err(e) = sender_req
306 .request
307 .maybe_fill_missing_columns(®ion_ctx.version().metadata)
308 {
309 sender_req.sender.send(Err(e));
310
311 continue;
312 }
313 }
314
315 region_ctx.push_mutation(
317 sender_req.request.op_type as i32,
318 Some(sender_req.request.rows),
319 sender_req.request.hint,
320 sender_req.sender,
321 );
322 }
323 }
324
325 fn process_bulk_requests(
327 &mut self,
328 region_ctxs: &mut HashMap<RegionId, RegionWriteCtx>,
329 requests: &mut Vec<SenderBulkRequest>,
330 ) {
331 let _timer = metrics::REGION_WORKER_HANDLE_WRITE_ELAPSED
332 .with_label_values(&["prepare_bulk_request"])
333 .start_timer();
334 for mut bulk_req in requests.drain(..) {
335 let region_id = bulk_req.region_id;
336 if self.flush_scheduler.has_pending_ddls(region_id) {
338 self.flush_scheduler.add_bulk_request_to_pending(bulk_req);
340 continue;
341 }
342
343 if let hash_map::Entry::Vacant(e) = region_ctxs.entry(region_id) {
345 let Some(region) = self.regions.get_region_or(region_id, &mut bulk_req.sender)
346 else {
347 continue;
348 };
349 match region.state() {
350 RegionRoleState::Leader(RegionLeaderState::Writable) => {
351 let region_ctx = RegionWriteCtx::new(
352 region.region_id,
353 ®ion.version_control,
354 region.provider.clone(),
355 Some(region.written_bytes.clone()),
356 );
357
358 e.insert(region_ctx);
359 }
360 RegionRoleState::Leader(RegionLeaderState::Altering) => {
361 debug!(
362 "Region {} is altering, add request to pending writes",
363 region.region_id
364 );
365 self.stalling_count.add(1);
366 WRITE_STALL_TOTAL.inc();
367 self.stalled_requests.push_bulk(bulk_req);
368 continue;
369 }
370 state => {
371 bulk_req.sender.send(
373 RegionStateSnafu {
374 region_id,
375 state,
376 expect: RegionRoleState::Leader(RegionLeaderState::Writable),
377 }
378 .fail(),
379 );
380 continue;
381 }
382 }
383 }
384
385 let region_ctx = region_ctxs.get_mut(®ion_id).unwrap();
387
388 let need_fill_missing_columns = region_ctx.version().metadata.schema_version
390 != bulk_req.region_metadata.schema_version;
391
392 if need_fill_missing_columns {
394 bulk_req.sender.send(
396 InvalidRequestSnafu {
397 region_id,
398 reason: "Schema mismatch",
399 }
400 .fail(),
401 );
402 return;
403 }
404
405 if !region_ctx.push_bulk(bulk_req.sender, bulk_req.request) {
407 return;
408 }
409 }
410 }
411
412 pub(crate) fn should_reject_write(&self) -> bool {
414 self.write_buffer_manager.memory_usage() + self.stalled_requests.estimated_size
416 >= self.config.global_write_buffer_reject_size.as_bytes() as usize
417 }
418}
419
420fn reject_write_requests(
422 write_requests: &mut Vec<SenderWriteRequest>,
423 bulk_requests: &mut Vec<SenderBulkRequest>,
424) {
425 WRITE_REJECT_TOTAL.inc_by(write_requests.len() as u64);
426
427 for req in write_requests.drain(..) {
428 req.sender.send(
429 RejectWriteSnafu {
430 region_id: req.request.region_id,
431 }
432 .fail(),
433 );
434 }
435 for req in bulk_requests.drain(..) {
436 let region_id = req.region_id;
437 req.sender.send(RejectWriteSnafu { region_id }.fail());
438 }
439}
440
441fn check_op_type(append_mode: bool, request: &WriteRequest) -> Result<()> {
443 if append_mode {
444 ensure!(
445 request.op_type == OpType::Put,
446 InvalidRequestSnafu {
447 region_id: request.region_id,
448 reason: "DELETE is not allowed under append mode",
449 }
450 );
451 }
452
453 Ok(())
454}