mito2/worker/
handle_write.rs1use std::collections::{hash_map, HashMap};
18use std::sync::Arc;
19
20use api::v1::OpType;
21use common_telemetry::{debug, error};
22use common_wal::options::WalOptions;
23use snafu::ensure;
24use store_api::codec::PrimaryKeyEncoding;
25use store_api::logstore::LogStore;
26use store_api::storage::RegionId;
27
28use crate::error::{InvalidRequestSnafu, RegionStateSnafu, RejectWriteSnafu, Result};
29use crate::metrics::{WRITE_REJECT_TOTAL, WRITE_ROWS_TOTAL, WRITE_STAGE_ELAPSED};
30use crate::region::{RegionLeaderState, RegionRoleState};
31use crate::region_write_ctx::RegionWriteCtx;
32use crate::request::{SenderWriteRequest, WriteRequest};
33use crate::worker::RegionWorkerLoop;
34
35impl<S: LogStore> RegionWorkerLoop<S> {
36 pub(crate) async fn handle_write_requests(
38 &mut self,
39 write_requests: &mut Vec<SenderWriteRequest>,
40 allow_stall: bool,
41 ) {
42 if write_requests.is_empty() {
43 return;
44 }
45
46 self.maybe_flush_worker();
48
49 if self.should_reject_write() {
50 reject_write_requests(write_requests);
52 self.reject_stalled_requests();
54 return;
55 }
56
57 if self.write_buffer_manager.should_stall() && allow_stall {
58 self.stalled_count.add(write_requests.len() as i64);
59 self.stalled_requests.append(write_requests);
60 self.listener.on_write_stall();
61 return;
62 }
63
64 let mut region_ctxs = {
66 let _timer = WRITE_STAGE_ELAPSED
67 .with_label_values(&["prepare_ctx"])
68 .start_timer();
69 self.prepare_region_write_ctx(write_requests)
70 };
71
72 {
74 let _timer = WRITE_STAGE_ELAPSED
75 .with_label_values(&["write_wal"])
76 .start_timer();
77 let mut wal_writer = self.wal.writer();
78 for region_ctx in region_ctxs.values_mut() {
79 if let WalOptions::Noop = ®ion_ctx.version().options.wal_options {
80 continue;
82 }
83 if let Err(e) = region_ctx.add_wal_entry(&mut wal_writer).map_err(Arc::new) {
84 region_ctx.set_error(e);
85 }
86 }
87 match wal_writer.write_to_wal().await.map_err(Arc::new) {
88 Ok(response) => {
89 for (region_id, region_ctx) in region_ctxs.iter_mut() {
90 if let WalOptions::Noop = ®ion_ctx.version().options.wal_options {
91 continue;
92 }
93
94 let last_entry_id = response.last_entry_ids.get(region_id).unwrap();
97 region_ctx.set_next_entry_id(last_entry_id + 1);
98 }
99 }
100 Err(e) => {
101 for mut region_ctx in region_ctxs.into_values() {
103 region_ctx.set_error(e.clone());
104 }
105 return;
106 }
107 }
108 }
109
110 let (mut put_rows, mut delete_rows) = (0, 0);
111 {
113 let _timer = WRITE_STAGE_ELAPSED
114 .with_label_values(&["write_memtable"])
115 .start_timer();
116 if region_ctxs.len() == 1 {
117 let mut region_ctx = region_ctxs.into_values().next().unwrap();
119 region_ctx.write_memtable().await;
120 put_rows += region_ctx.put_num;
121 delete_rows += region_ctx.delete_num;
122 } else {
123 let region_write_task = region_ctxs
124 .into_values()
125 .map(|mut region_ctx| {
126 common_runtime::spawn_global(async move {
128 region_ctx.write_memtable().await;
129 (region_ctx.put_num, region_ctx.delete_num)
130 })
131 })
132 .collect::<Vec<_>>();
133
134 for result in futures::future::join_all(region_write_task).await {
135 match result {
136 Ok((put, delete)) => {
137 put_rows += put;
138 delete_rows += delete;
139 }
140 Err(e) => {
141 error!(e; "unexpected error when joining region write tasks");
142 }
143 }
144 }
145 }
146 }
147 WRITE_ROWS_TOTAL
148 .with_label_values(&["put"])
149 .inc_by(put_rows as u64);
150 WRITE_ROWS_TOTAL
151 .with_label_values(&["delete"])
152 .inc_by(delete_rows as u64);
153 }
154
155 pub(crate) async fn handle_stalled_requests(&mut self) {
157 let stalled = std::mem::take(&mut self.stalled_requests);
159 self.stalled_count.sub(stalled.stalled_count() as i64);
160 for (_, (_, mut requests)) in stalled.requests {
162 self.handle_write_requests(&mut requests, false).await;
163 }
164 }
165
166 pub(crate) fn reject_stalled_requests(&mut self) {
168 let stalled = std::mem::take(&mut self.stalled_requests);
169 self.stalled_count.sub(stalled.stalled_count() as i64);
170 for (_, (_, mut requests)) in stalled.requests {
171 reject_write_requests(&mut requests);
172 }
173 }
174
175 pub(crate) fn reject_region_stalled_requests(&mut self, region_id: &RegionId) {
177 debug!("Rejects stalled requests for region {}", region_id);
178 let mut requests = self.stalled_requests.remove(region_id);
179 self.stalled_count.sub(requests.len() as i64);
180 reject_write_requests(&mut requests);
181 }
182
183 pub(crate) async fn handle_region_stalled_requests(&mut self, region_id: &RegionId) {
185 debug!("Handles stalled requests for region {}", region_id);
186 let mut requests = self.stalled_requests.remove(region_id);
187 self.stalled_count.sub(requests.len() as i64);
188 self.handle_write_requests(&mut requests, true).await;
189 }
190}
191
192impl<S> RegionWorkerLoop<S> {
193 fn prepare_region_write_ctx(
195 &mut self,
196 write_requests: &mut Vec<SenderWriteRequest>,
197 ) -> HashMap<RegionId, RegionWriteCtx> {
198 let mut region_ctxs = HashMap::new();
200 for mut sender_req in write_requests.drain(..) {
201 let region_id = sender_req.request.region_id;
202
203 if self.flush_scheduler.has_pending_ddls(region_id) {
205 self.flush_scheduler
208 .add_write_request_to_pending(sender_req);
209 continue;
210 }
211
212 if let hash_map::Entry::Vacant(e) = region_ctxs.entry(region_id) {
214 let Some(region) = self
215 .regions
216 .get_region_or(region_id, &mut sender_req.sender)
217 else {
218 continue;
220 };
221 match region.state() {
222 RegionRoleState::Leader(RegionLeaderState::Writable) => {
223 let region_ctx = RegionWriteCtx::new(
224 region.region_id,
225 ®ion.version_control,
226 region.provider.clone(),
227 );
228
229 e.insert(region_ctx);
230 }
231 RegionRoleState::Leader(RegionLeaderState::Altering) => {
232 debug!(
233 "Region {} is altering, add request to pending writes",
234 region.region_id
235 );
236 self.stalled_count.add(1);
237 self.stalled_requests.push(sender_req);
238 continue;
239 }
240 state => {
241 sender_req.sender.send(
243 RegionStateSnafu {
244 region_id,
245 state,
246 expect: RegionRoleState::Leader(RegionLeaderState::Writable),
247 }
248 .fail(),
249 );
250 continue;
251 }
252 }
253 }
254
255 let region_ctx = region_ctxs.get_mut(®ion_id).unwrap();
257
258 if let Err(e) = check_op_type(
259 region_ctx.version().options.append_mode,
260 &sender_req.request,
261 ) {
262 sender_req.sender.send(Err(e));
264
265 continue;
266 }
267
268 let need_fill_missing_columns =
270 if let Some(ref region_metadata) = sender_req.request.region_metadata {
271 region_ctx.version().metadata.schema_version != region_metadata.schema_version
272 } else {
273 true
274 };
275 if need_fill_missing_columns
277 && sender_req.request.primary_key_encoding() == PrimaryKeyEncoding::Dense
278 {
279 if let Err(e) = sender_req
280 .request
281 .maybe_fill_missing_columns(®ion_ctx.version().metadata)
282 {
283 sender_req.sender.send(Err(e));
284
285 continue;
286 }
287 }
288
289 region_ctx.push_mutation(
291 sender_req.request.op_type as i32,
292 Some(sender_req.request.rows),
293 sender_req.request.hint,
294 sender_req.sender,
295 );
296 }
297
298 region_ctxs
299 }
300
301 pub(crate) fn should_reject_write(&self) -> bool {
303 self.write_buffer_manager.memory_usage() + self.stalled_requests.estimated_size
305 >= self.config.global_write_buffer_reject_size.as_bytes() as usize
306 }
307}
308
309fn reject_write_requests(write_requests: &mut Vec<SenderWriteRequest>) {
311 WRITE_REJECT_TOTAL.inc_by(write_requests.len() as u64);
312
313 for req in write_requests.drain(..) {
314 req.sender.send(
315 RejectWriteSnafu {
316 region_id: req.request.region_id,
317 }
318 .fail(),
319 );
320 }
321}
322
323fn check_op_type(append_mode: bool, request: &WriteRequest) -> Result<()> {
325 if append_mode {
326 ensure!(
327 request.op_type == OpType::Put,
328 InvalidRequestSnafu {
329 region_id: request.region_id,
330 reason: "DELETE is not allowed under append mode",
331 }
332 );
333 }
334
335 Ok(())
336}