1use std::collections::{hash_map, HashMap};
18use std::sync::Arc;
19
20use api::v1::OpType;
21use common_telemetry::{debug, error};
22use common_wal::options::WalOptions;
23use snafu::ensure;
24use store_api::codec::PrimaryKeyEncoding;
25use store_api::logstore::LogStore;
26use store_api::storage::RegionId;
27
28use crate::error::{InvalidRequestSnafu, RegionStateSnafu, RejectWriteSnafu, Result};
29use crate::metrics;
30use crate::metrics::{WRITE_REJECT_TOTAL, WRITE_ROWS_TOTAL, WRITE_STAGE_ELAPSED};
31use crate::region::{RegionLeaderState, RegionRoleState};
32use crate::region_write_ctx::RegionWriteCtx;
33use crate::request::{SenderBulkRequest, SenderWriteRequest, WriteRequest};
34use crate::worker::RegionWorkerLoop;
35
36impl<S: LogStore> RegionWorkerLoop<S> {
37 pub(crate) async fn handle_write_requests(
39 &mut self,
40 write_requests: &mut Vec<SenderWriteRequest>,
41 bulk_requests: &mut Vec<SenderBulkRequest>,
42 allow_stall: bool,
43 ) {
44 if write_requests.is_empty() && bulk_requests.is_empty() {
45 return;
46 }
47
48 self.maybe_flush_worker();
50
51 if self.should_reject_write() {
52 reject_write_requests(write_requests, bulk_requests);
54 self.reject_stalled_requests();
56 return;
57 }
58
59 if self.write_buffer_manager.should_stall() && allow_stall {
60 self.stalled_count
61 .add((write_requests.len() + bulk_requests.len()) as i64);
62 self.stalled_requests.append(write_requests, bulk_requests);
63 self.listener.on_write_stall();
64 return;
65 }
66
67 let mut region_ctxs = {
69 let _timer = WRITE_STAGE_ELAPSED
70 .with_label_values(&["prepare_ctx"])
71 .start_timer();
72 self.prepare_region_write_ctx(write_requests, bulk_requests)
73 };
74
75 {
77 let _timer = WRITE_STAGE_ELAPSED
78 .with_label_values(&["write_wal"])
79 .start_timer();
80 let mut wal_writer = self.wal.writer();
81 for region_ctx in region_ctxs.values_mut() {
82 if let WalOptions::Noop = ®ion_ctx.version().options.wal_options {
83 continue;
85 }
86 if let Err(e) = region_ctx.add_wal_entry(&mut wal_writer).map_err(Arc::new) {
87 region_ctx.set_error(e);
88 }
89 }
90 match wal_writer.write_to_wal().await.map_err(Arc::new) {
91 Ok(response) => {
92 for (region_id, region_ctx) in region_ctxs.iter_mut() {
93 if let WalOptions::Noop = ®ion_ctx.version().options.wal_options {
94 continue;
95 }
96
97 let last_entry_id = response.last_entry_ids.get(region_id).unwrap();
100 region_ctx.set_next_entry_id(last_entry_id + 1);
101 }
102 }
103 Err(e) => {
104 for mut region_ctx in region_ctxs.into_values() {
106 region_ctx.set_error(e.clone());
107 }
108 return;
109 }
110 }
111 }
112
113 let (mut put_rows, mut delete_rows) = (0, 0);
114 {
116 let _timer = WRITE_STAGE_ELAPSED
117 .with_label_values(&["write_memtable"])
118 .start_timer();
119 if region_ctxs.len() == 1 {
120 let mut region_ctx = region_ctxs.into_values().next().unwrap();
122 region_ctx.write_memtable().await;
123 region_ctx.write_bulk().await;
124 put_rows += region_ctx.put_num;
125 delete_rows += region_ctx.delete_num;
126 } else {
127 let region_write_task = region_ctxs
128 .into_values()
129 .map(|mut region_ctx| {
130 common_runtime::spawn_global(async move {
132 region_ctx.write_memtable().await;
133 region_ctx.write_bulk().await;
134 (region_ctx.put_num, region_ctx.delete_num)
135 })
136 })
137 .collect::<Vec<_>>();
138
139 for result in futures::future::join_all(region_write_task).await {
140 match result {
141 Ok((put, delete)) => {
142 put_rows += put;
143 delete_rows += delete;
144 }
145 Err(e) => {
146 error!(e; "unexpected error when joining region write tasks");
147 }
148 }
149 }
150 }
151 }
152 WRITE_ROWS_TOTAL
153 .with_label_values(&["put"])
154 .inc_by(put_rows as u64);
155 WRITE_ROWS_TOTAL
156 .with_label_values(&["delete"])
157 .inc_by(delete_rows as u64);
158 }
159
160 pub(crate) async fn handle_stalled_requests(&mut self) {
162 let stalled = std::mem::take(&mut self.stalled_requests);
164 self.stalled_count.sub(stalled.stalled_count() as i64);
165 for (_, (_, mut requests, mut bulk)) in stalled.requests {
167 self.handle_write_requests(&mut requests, &mut bulk, false)
168 .await;
169 }
170 }
171
172 pub(crate) fn reject_stalled_requests(&mut self) {
174 let stalled = std::mem::take(&mut self.stalled_requests);
175 self.stalled_count.sub(stalled.stalled_count() as i64);
176 for (_, (_, mut requests, mut bulk)) in stalled.requests {
177 reject_write_requests(&mut requests, &mut bulk);
178 }
179 }
180
181 pub(crate) fn reject_region_stalled_requests(&mut self, region_id: &RegionId) {
183 debug!("Rejects stalled requests for region {}", region_id);
184 let (mut requests, mut bulk) = self.stalled_requests.remove(region_id);
185 self.stalled_count.sub((requests.len() + bulk.len()) as i64);
186 reject_write_requests(&mut requests, &mut bulk);
187 }
188
189 pub(crate) async fn handle_region_stalled_requests(&mut self, region_id: &RegionId) {
191 debug!("Handles stalled requests for region {}", region_id);
192 let (mut requests, mut bulk) = self.stalled_requests.remove(region_id);
193 self.stalled_count.sub((requests.len() + bulk.len()) as i64);
194 self.handle_write_requests(&mut requests, &mut bulk, true)
195 .await;
196 }
197}
198
199impl<S> RegionWorkerLoop<S> {
200 fn prepare_region_write_ctx(
202 &mut self,
203 write_requests: &mut Vec<SenderWriteRequest>,
204 bulk_requests: &mut Vec<SenderBulkRequest>,
205 ) -> HashMap<RegionId, RegionWriteCtx> {
206 let mut region_ctxs = HashMap::new();
208 self.process_write_requests(&mut region_ctxs, write_requests);
209 self.process_bulk_requests(&mut region_ctxs, bulk_requests);
210 region_ctxs
211 }
212
213 fn process_write_requests(
214 &mut self,
215 region_ctxs: &mut HashMap<RegionId, RegionWriteCtx>,
216 write_requests: &mut Vec<SenderWriteRequest>,
217 ) {
218 for mut sender_req in write_requests.drain(..) {
219 let region_id = sender_req.request.region_id;
220
221 if self.flush_scheduler.has_pending_ddls(region_id) {
223 self.flush_scheduler
226 .add_write_request_to_pending(sender_req);
227 continue;
228 }
229
230 if let hash_map::Entry::Vacant(e) = region_ctxs.entry(region_id) {
232 let Some(region) = self
233 .regions
234 .get_region_or(region_id, &mut sender_req.sender)
235 else {
236 continue;
238 };
239 match region.state() {
240 RegionRoleState::Leader(RegionLeaderState::Writable) => {
241 let region_ctx = RegionWriteCtx::new(
242 region.region_id,
243 ®ion.version_control,
244 region.provider.clone(),
245 );
246
247 e.insert(region_ctx);
248 }
249 RegionRoleState::Leader(RegionLeaderState::Altering) => {
250 debug!(
251 "Region {} is altering, add request to pending writes",
252 region.region_id
253 );
254 self.stalled_count.add(1);
255 self.stalled_requests.push(sender_req);
256 continue;
257 }
258 state => {
259 sender_req.sender.send(
261 RegionStateSnafu {
262 region_id,
263 state,
264 expect: RegionRoleState::Leader(RegionLeaderState::Writable),
265 }
266 .fail(),
267 );
268 continue;
269 }
270 }
271 }
272
273 let region_ctx = region_ctxs.get_mut(®ion_id).unwrap();
275
276 if let Err(e) = check_op_type(
277 region_ctx.version().options.append_mode,
278 &sender_req.request,
279 ) {
280 sender_req.sender.send(Err(e));
282
283 continue;
284 }
285
286 let need_fill_missing_columns =
288 if let Some(ref region_metadata) = sender_req.request.region_metadata {
289 region_ctx.version().metadata.schema_version != region_metadata.schema_version
290 } else {
291 true
292 };
293 if need_fill_missing_columns
295 && sender_req.request.primary_key_encoding() == PrimaryKeyEncoding::Dense
296 {
297 if let Err(e) = sender_req
298 .request
299 .maybe_fill_missing_columns(®ion_ctx.version().metadata)
300 {
301 sender_req.sender.send(Err(e));
302
303 continue;
304 }
305 }
306
307 region_ctx.push_mutation(
309 sender_req.request.op_type as i32,
310 Some(sender_req.request.rows),
311 sender_req.request.hint,
312 sender_req.sender,
313 );
314 }
315 }
316
317 fn process_bulk_requests(
319 &mut self,
320 region_ctxs: &mut HashMap<RegionId, RegionWriteCtx>,
321 requests: &mut Vec<SenderBulkRequest>,
322 ) {
323 let _timer = metrics::REGION_WORKER_HANDLE_WRITE_ELAPSED
324 .with_label_values(&["prepare_bulk_request"])
325 .start_timer();
326 for mut bulk_req in requests.drain(..) {
327 let region_id = bulk_req.region_id;
328 if self.flush_scheduler.has_pending_ddls(region_id) {
330 self.flush_scheduler.add_bulk_request_to_pending(bulk_req);
332 continue;
333 }
334
335 if let hash_map::Entry::Vacant(e) = region_ctxs.entry(region_id) {
337 let Some(region) = self.regions.get_region_or(region_id, &mut bulk_req.sender)
338 else {
339 continue;
340 };
341 match region.state() {
342 RegionRoleState::Leader(RegionLeaderState::Writable) => {
343 let region_ctx = RegionWriteCtx::new(
344 region.region_id,
345 ®ion.version_control,
346 region.provider.clone(),
347 );
348
349 e.insert(region_ctx);
350 }
351 RegionRoleState::Leader(RegionLeaderState::Altering) => {
352 debug!(
353 "Region {} is altering, add request to pending writes",
354 region.region_id
355 );
356 self.stalled_count.add(1);
357 self.stalled_requests.push_bulk(bulk_req);
358 continue;
359 }
360 state => {
361 bulk_req.sender.send(
363 RegionStateSnafu {
364 region_id,
365 state,
366 expect: RegionRoleState::Leader(RegionLeaderState::Writable),
367 }
368 .fail(),
369 );
370 continue;
371 }
372 }
373 }
374
375 let region_ctx = region_ctxs.get_mut(®ion_id).unwrap();
377
378 let need_fill_missing_columns = region_ctx.version().metadata.schema_version
380 != bulk_req.region_metadata.schema_version;
381
382 if need_fill_missing_columns {
384 bulk_req.sender.send(
386 InvalidRequestSnafu {
387 region_id,
388 reason: "Schema mismatch",
389 }
390 .fail(),
391 );
392 return;
393 }
394
395 region_ctx.push_bulk(bulk_req.sender, bulk_req.request);
397 }
398 }
399
400 pub(crate) fn should_reject_write(&self) -> bool {
402 self.write_buffer_manager.memory_usage() + self.stalled_requests.estimated_size
404 >= self.config.global_write_buffer_reject_size.as_bytes() as usize
405 }
406}
407
408fn reject_write_requests(
410 write_requests: &mut Vec<SenderWriteRequest>,
411 bulk_requests: &mut Vec<SenderBulkRequest>,
412) {
413 WRITE_REJECT_TOTAL.inc_by(write_requests.len() as u64);
414
415 for req in write_requests.drain(..) {
416 req.sender.send(
417 RejectWriteSnafu {
418 region_id: req.request.region_id,
419 }
420 .fail(),
421 );
422 }
423 for req in bulk_requests.drain(..) {
424 let region_id = req.region_id;
425 req.sender.send(RejectWriteSnafu { region_id }.fail());
426 }
427}
428
429fn check_op_type(append_mode: bool, request: &WriteRequest) -> Result<()> {
431 if append_mode {
432 ensure!(
433 request.op_type == OpType::Put,
434 InvalidRequestSnafu {
435 region_id: request.region_id,
436 reason: "DELETE is not allowed under append mode",
437 }
438 );
439 }
440
441 Ok(())
442}