1use std::collections::{HashMap, hash_map};
18use std::sync::Arc;
19
20use api::v1::OpType;
21use common_telemetry::{debug, error};
22use common_wal::options::WalOptions;
23use snafu::ensure;
24use store_api::codec::PrimaryKeyEncoding;
25use store_api::logstore::LogStore;
26use store_api::storage::RegionId;
27
28use crate::error::{InvalidRequestSnafu, RegionStateSnafu, RejectWriteSnafu, Result};
29use crate::metrics;
30use crate::metrics::{
31 WRITE_REJECT_TOTAL, WRITE_ROWS_TOTAL, WRITE_STAGE_ELAPSED, WRITE_STALL_TOTAL,
32};
33use crate::region::{RegionLeaderState, RegionRoleState};
34use crate::region_write_ctx::RegionWriteCtx;
35use crate::request::{SenderBulkRequest, SenderWriteRequest, WriteRequest};
36use crate::worker::RegionWorkerLoop;
37
38impl<S: LogStore> RegionWorkerLoop<S> {
39 pub(crate) async fn handle_write_requests(
41 &mut self,
42 write_requests: &mut Vec<SenderWriteRequest>,
43 bulk_requests: &mut Vec<SenderBulkRequest>,
44 allow_stall: bool,
45 ) {
46 if write_requests.is_empty() && bulk_requests.is_empty() {
47 return;
48 }
49
50 self.maybe_flush_worker();
52
53 if self.should_reject_write() {
54 reject_write_requests(write_requests, bulk_requests);
56 self.reject_stalled_requests();
58 return;
59 }
60
61 if self.write_buffer_manager.should_stall() && allow_stall {
62 let stalled_count = (write_requests.len() + bulk_requests.len()) as i64;
63 self.stalling_count.add(stalled_count);
64 WRITE_STALL_TOTAL.inc_by(stalled_count as u64);
65 self.stalled_requests.append(write_requests, bulk_requests);
66 self.listener.on_write_stall();
67 return;
68 }
69
70 let mut region_ctxs = {
72 let _timer = WRITE_STAGE_ELAPSED
73 .with_label_values(&["prepare_ctx"])
74 .start_timer();
75 self.prepare_region_write_ctx(write_requests, bulk_requests)
76 };
77
78 {
80 let _timer = WRITE_STAGE_ELAPSED
81 .with_label_values(&["write_wal"])
82 .start_timer();
83 let mut wal_writer = self.wal.writer();
84 for region_ctx in region_ctxs.values_mut() {
85 if let WalOptions::Noop = ®ion_ctx.version().options.wal_options {
86 continue;
88 }
89 if let Err(e) = region_ctx.add_wal_entry(&mut wal_writer).map_err(Arc::new) {
90 region_ctx.set_error(e);
91 }
92 }
93 match wal_writer.write_to_wal().await.map_err(Arc::new) {
94 Ok(response) => {
95 for (region_id, region_ctx) in region_ctxs.iter_mut() {
96 if let WalOptions::Noop = ®ion_ctx.version().options.wal_options {
97 continue;
98 }
99
100 let last_entry_id = response.last_entry_ids.get(region_id).unwrap();
103 region_ctx.set_next_entry_id(last_entry_id + 1);
104 }
105 }
106 Err(e) => {
107 for mut region_ctx in region_ctxs.into_values() {
109 region_ctx.set_error(e.clone());
110 }
111 return;
112 }
113 }
114 }
115
116 let (mut put_rows, mut delete_rows) = (0, 0);
117 {
119 let _timer = WRITE_STAGE_ELAPSED
120 .with_label_values(&["write_memtable"])
121 .start_timer();
122 if region_ctxs.len() == 1 {
123 let mut region_ctx = region_ctxs.into_values().next().unwrap();
125 region_ctx.write_memtable().await;
126 region_ctx.write_bulk().await;
127 put_rows += region_ctx.put_num;
128 delete_rows += region_ctx.delete_num;
129 } else {
130 let region_write_task = region_ctxs
131 .into_values()
132 .map(|mut region_ctx| {
133 common_runtime::spawn_global(async move {
135 region_ctx.write_memtable().await;
136 region_ctx.write_bulk().await;
137 (region_ctx.put_num, region_ctx.delete_num)
138 })
139 })
140 .collect::<Vec<_>>();
141
142 for result in futures::future::join_all(region_write_task).await {
143 match result {
144 Ok((put, delete)) => {
145 put_rows += put;
146 delete_rows += delete;
147 }
148 Err(e) => {
149 error!(e; "unexpected error when joining region write tasks");
150 }
151 }
152 }
153 }
154 }
155 WRITE_ROWS_TOTAL
156 .with_label_values(&["put"])
157 .inc_by(put_rows as u64);
158 WRITE_ROWS_TOTAL
159 .with_label_values(&["delete"])
160 .inc_by(delete_rows as u64);
161 }
162
163 pub(crate) async fn handle_stalled_requests(&mut self) {
165 let stalled = std::mem::take(&mut self.stalled_requests);
167 self.stalling_count.sub(stalled.stalled_count() as i64);
168 for (_, (_, mut requests, mut bulk)) in stalled.requests {
170 self.handle_write_requests(&mut requests, &mut bulk, false)
171 .await;
172 }
173 }
174
175 pub(crate) fn reject_stalled_requests(&mut self) {
177 let stalled = std::mem::take(&mut self.stalled_requests);
178 self.stalling_count.sub(stalled.stalled_count() as i64);
179 for (_, (_, mut requests, mut bulk)) in stalled.requests {
180 reject_write_requests(&mut requests, &mut bulk);
181 }
182 }
183
184 pub(crate) fn reject_region_stalled_requests(&mut self, region_id: &RegionId) {
186 debug!("Rejects stalled requests for region {}", region_id);
187 let (mut requests, mut bulk) = self.stalled_requests.remove(region_id);
188 self.stalling_count
189 .sub((requests.len() + bulk.len()) as i64);
190 reject_write_requests(&mut requests, &mut bulk);
191 }
192
193 pub(crate) async fn handle_region_stalled_requests(&mut self, region_id: &RegionId) {
195 debug!("Handles stalled requests for region {}", region_id);
196 let (mut requests, mut bulk) = self.stalled_requests.remove(region_id);
197 self.stalling_count
198 .sub((requests.len() + bulk.len()) as i64);
199 self.handle_write_requests(&mut requests, &mut bulk, true)
200 .await;
201 }
202}
203
204impl<S> RegionWorkerLoop<S> {
205 fn prepare_region_write_ctx(
207 &mut self,
208 write_requests: &mut Vec<SenderWriteRequest>,
209 bulk_requests: &mut Vec<SenderBulkRequest>,
210 ) -> HashMap<RegionId, RegionWriteCtx> {
211 let mut region_ctxs = HashMap::new();
213 self.process_write_requests(&mut region_ctxs, write_requests);
214 self.process_bulk_requests(&mut region_ctxs, bulk_requests);
215 region_ctxs
216 }
217
218 fn process_write_requests(
219 &mut self,
220 region_ctxs: &mut HashMap<RegionId, RegionWriteCtx>,
221 write_requests: &mut Vec<SenderWriteRequest>,
222 ) {
223 for mut sender_req in write_requests.drain(..) {
224 let region_id = sender_req.request.region_id;
225
226 if self.flush_scheduler.has_pending_ddls(region_id) {
228 self.flush_scheduler
231 .add_write_request_to_pending(sender_req);
232 continue;
233 }
234
235 if let hash_map::Entry::Vacant(e) = region_ctxs.entry(region_id) {
237 let Some(region) = self
238 .regions
239 .get_region_or(region_id, &mut sender_req.sender)
240 else {
241 continue;
243 };
244 match region.state() {
245 RegionRoleState::Leader(RegionLeaderState::Writable)
246 | RegionRoleState::Leader(RegionLeaderState::Staging) => {
247 let region_ctx = RegionWriteCtx::new(
248 region.region_id,
249 ®ion.version_control,
250 region.provider.clone(),
251 Some(region.written_bytes.clone()),
252 );
253
254 e.insert(region_ctx);
255 }
256 RegionRoleState::Leader(RegionLeaderState::Altering) => {
257 debug!(
258 "Region {} is altering, add request to pending writes",
259 region.region_id
260 );
261 self.stalling_count.add(1);
262 WRITE_STALL_TOTAL.inc();
263 self.stalled_requests.push(sender_req);
264 continue;
265 }
266 state => {
267 sender_req.sender.send(
269 RegionStateSnafu {
270 region_id,
271 state,
272 expect: RegionRoleState::Leader(RegionLeaderState::Writable),
273 }
274 .fail(),
275 );
276 continue;
277 }
278 }
279 }
280
281 let region_ctx = region_ctxs.get_mut(®ion_id).unwrap();
283
284 if let Err(e) = check_op_type(
285 region_ctx.version().options.append_mode,
286 &sender_req.request,
287 ) {
288 sender_req.sender.send(Err(e));
290
291 continue;
292 }
293
294 let need_fill_missing_columns =
296 if let Some(ref region_metadata) = sender_req.request.region_metadata {
297 region_ctx.version().metadata.schema_version != region_metadata.schema_version
298 } else {
299 true
300 };
301 if need_fill_missing_columns
303 && sender_req.request.primary_key_encoding() == PrimaryKeyEncoding::Dense
304 && let Err(e) = sender_req
305 .request
306 .maybe_fill_missing_columns(®ion_ctx.version().metadata)
307 {
308 sender_req.sender.send(Err(e));
309
310 continue;
311 }
312
313 region_ctx.push_mutation(
315 sender_req.request.op_type as i32,
316 Some(sender_req.request.rows),
317 sender_req.request.hint,
318 sender_req.sender,
319 None,
320 );
321 }
322 }
323
324 fn process_bulk_requests(
326 &mut self,
327 region_ctxs: &mut HashMap<RegionId, RegionWriteCtx>,
328 requests: &mut Vec<SenderBulkRequest>,
329 ) {
330 let _timer = metrics::REGION_WORKER_HANDLE_WRITE_ELAPSED
331 .with_label_values(&["prepare_bulk_request"])
332 .start_timer();
333 for mut bulk_req in requests.drain(..) {
334 let region_id = bulk_req.region_id;
335 if self.flush_scheduler.has_pending_ddls(region_id) {
337 self.flush_scheduler.add_bulk_request_to_pending(bulk_req);
339 continue;
340 }
341
342 if let hash_map::Entry::Vacant(e) = region_ctxs.entry(region_id) {
344 let Some(region) = self.regions.get_region_or(region_id, &mut bulk_req.sender)
345 else {
346 continue;
347 };
348 match region.state() {
349 RegionRoleState::Leader(RegionLeaderState::Writable) => {
350 let region_ctx = RegionWriteCtx::new(
351 region.region_id,
352 ®ion.version_control,
353 region.provider.clone(),
354 Some(region.written_bytes.clone()),
355 );
356
357 e.insert(region_ctx);
358 }
359 RegionRoleState::Leader(RegionLeaderState::Altering) => {
360 debug!(
361 "Region {} is altering, add request to pending writes",
362 region.region_id
363 );
364 self.stalling_count.add(1);
365 WRITE_STALL_TOTAL.inc();
366 self.stalled_requests.push_bulk(bulk_req);
367 continue;
368 }
369 state => {
370 bulk_req.sender.send(
372 RegionStateSnafu {
373 region_id,
374 state,
375 expect: RegionRoleState::Leader(RegionLeaderState::Writable),
376 }
377 .fail(),
378 );
379 continue;
380 }
381 }
382 }
383
384 let region_ctx = region_ctxs.get_mut(®ion_id).unwrap();
386
387 let need_fill_missing_columns = region_ctx.version().metadata.schema_version
389 != bulk_req.region_metadata.schema_version;
390
391 if need_fill_missing_columns {
393 bulk_req.sender.send(
395 InvalidRequestSnafu {
396 region_id,
397 reason: "Schema mismatch",
398 }
399 .fail(),
400 );
401 return;
402 }
403
404 if !region_ctx.push_bulk(bulk_req.sender, bulk_req.request, None) {
406 return;
407 }
408 }
409 }
410
411 pub(crate) fn should_reject_write(&self) -> bool {
413 self.write_buffer_manager.memory_usage() + self.stalled_requests.estimated_size
415 >= self.config.global_write_buffer_reject_size.as_bytes() as usize
416 }
417}
418
419fn reject_write_requests(
421 write_requests: &mut Vec<SenderWriteRequest>,
422 bulk_requests: &mut Vec<SenderBulkRequest>,
423) {
424 WRITE_REJECT_TOTAL.inc_by(write_requests.len() as u64);
425
426 for req in write_requests.drain(..) {
427 req.sender.send(
428 RejectWriteSnafu {
429 region_id: req.request.region_id,
430 }
431 .fail(),
432 );
433 }
434 for req in bulk_requests.drain(..) {
435 let region_id = req.region_id;
436 req.sender.send(RejectWriteSnafu { region_id }.fail());
437 }
438}
439
440fn check_op_type(append_mode: bool, request: &WriteRequest) -> Result<()> {
442 if append_mode {
443 ensure!(
444 request.op_type == OpType::Put,
445 InvalidRequestSnafu {
446 region_id: request.region_id,
447 reason: "DELETE is not allowed under append mode",
448 }
449 );
450 }
451
452 Ok(())
453}