mito2/worker/
handle_write.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Handling write requests.
16
17use std::collections::{hash_map, HashMap};
18use std::sync::Arc;
19
20use api::v1::OpType;
21use common_telemetry::{debug, error};
22use common_wal::options::WalOptions;
23use snafu::ensure;
24use store_api::codec::PrimaryKeyEncoding;
25use store_api::logstore::LogStore;
26use store_api::storage::RegionId;
27
28use crate::error::{InvalidRequestSnafu, RegionStateSnafu, RejectWriteSnafu, Result};
29use crate::metrics;
30use crate::metrics::{
31    WRITE_REJECT_TOTAL, WRITE_ROWS_TOTAL, WRITE_STAGE_ELAPSED, WRITE_STALL_TOTAL,
32};
33use crate::region::{RegionLeaderState, RegionRoleState};
34use crate::region_write_ctx::RegionWriteCtx;
35use crate::request::{SenderBulkRequest, SenderWriteRequest, WriteRequest};
36use crate::worker::RegionWorkerLoop;
37
38impl<S: LogStore> RegionWorkerLoop<S> {
39    /// Takes and handles all write requests.
40    pub(crate) async fn handle_write_requests(
41        &mut self,
42        write_requests: &mut Vec<SenderWriteRequest>,
43        bulk_requests: &mut Vec<SenderBulkRequest>,
44        allow_stall: bool,
45    ) {
46        if write_requests.is_empty() && bulk_requests.is_empty() {
47            return;
48        }
49
50        // Flush this worker if the engine needs to flush.
51        self.maybe_flush_worker();
52
53        if self.should_reject_write() {
54            // The memory pressure is still too high, reject write requests.
55            reject_write_requests(write_requests, bulk_requests);
56            // Also reject all stalled requests.
57            self.reject_stalled_requests();
58            return;
59        }
60
61        if self.write_buffer_manager.should_stall() && allow_stall {
62            let stalled_count = (write_requests.len() + bulk_requests.len()) as i64;
63            self.stalling_count.add(stalled_count);
64            WRITE_STALL_TOTAL.inc_by(stalled_count as u64);
65            self.stalled_requests.append(write_requests, bulk_requests);
66            self.listener.on_write_stall();
67            return;
68        }
69
70        // Prepare write context.
71        let mut region_ctxs = {
72            let _timer = WRITE_STAGE_ELAPSED
73                .with_label_values(&["prepare_ctx"])
74                .start_timer();
75            self.prepare_region_write_ctx(write_requests, bulk_requests)
76        };
77
78        // Write to WAL.
79        {
80            let _timer = WRITE_STAGE_ELAPSED
81                .with_label_values(&["write_wal"])
82                .start_timer();
83            let mut wal_writer = self.wal.writer();
84            for region_ctx in region_ctxs.values_mut() {
85                if let WalOptions::Noop = &region_ctx.version().options.wal_options {
86                    // Skip wal write for noop region.
87                    continue;
88                }
89                if let Err(e) = region_ctx.add_wal_entry(&mut wal_writer).map_err(Arc::new) {
90                    region_ctx.set_error(e);
91                }
92            }
93            match wal_writer.write_to_wal().await.map_err(Arc::new) {
94                Ok(response) => {
95                    for (region_id, region_ctx) in region_ctxs.iter_mut() {
96                        if let WalOptions::Noop = &region_ctx.version().options.wal_options {
97                            continue;
98                        }
99
100                        // Safety: the log store implementation ensures that either the `write_to_wal` fails and no
101                        // response is returned or the last entry ids for each region do exist.
102                        let last_entry_id = response.last_entry_ids.get(region_id).unwrap();
103                        region_ctx.set_next_entry_id(last_entry_id + 1);
104                    }
105                }
106                Err(e) => {
107                    // Failed to write wal.
108                    for mut region_ctx in region_ctxs.into_values() {
109                        region_ctx.set_error(e.clone());
110                    }
111                    return;
112                }
113            }
114        }
115
116        let (mut put_rows, mut delete_rows) = (0, 0);
117        // Write to memtables.
118        {
119            let _timer = WRITE_STAGE_ELAPSED
120                .with_label_values(&["write_memtable"])
121                .start_timer();
122            if region_ctxs.len() == 1 {
123                // fast path for single region.
124                let mut region_ctx = region_ctxs.into_values().next().unwrap();
125                region_ctx.write_memtable().await;
126                region_ctx.write_bulk().await;
127                put_rows += region_ctx.put_num;
128                delete_rows += region_ctx.delete_num;
129            } else {
130                let region_write_task = region_ctxs
131                    .into_values()
132                    .map(|mut region_ctx| {
133                        // use tokio runtime to schedule tasks.
134                        common_runtime::spawn_global(async move {
135                            region_ctx.write_memtable().await;
136                            region_ctx.write_bulk().await;
137                            (region_ctx.put_num, region_ctx.delete_num)
138                        })
139                    })
140                    .collect::<Vec<_>>();
141
142                for result in futures::future::join_all(region_write_task).await {
143                    match result {
144                        Ok((put, delete)) => {
145                            put_rows += put;
146                            delete_rows += delete;
147                        }
148                        Err(e) => {
149                            error!(e; "unexpected error when joining region write tasks");
150                        }
151                    }
152                }
153            }
154        }
155        WRITE_ROWS_TOTAL
156            .with_label_values(&["put"])
157            .inc_by(put_rows as u64);
158        WRITE_ROWS_TOTAL
159            .with_label_values(&["delete"])
160            .inc_by(delete_rows as u64);
161    }
162
163    /// Handles all stalled write requests.
164    pub(crate) async fn handle_stalled_requests(&mut self) {
165        // Handle stalled requests.
166        let stalled = std::mem::take(&mut self.stalled_requests);
167        self.stalling_count.sub(stalled.stalled_count() as i64);
168        // We already stalled these requests, don't stall them again.
169        for (_, (_, mut requests, mut bulk)) in stalled.requests {
170            self.handle_write_requests(&mut requests, &mut bulk, false)
171                .await;
172        }
173    }
174
175    /// Rejects all stalled requests.
176    pub(crate) fn reject_stalled_requests(&mut self) {
177        let stalled = std::mem::take(&mut self.stalled_requests);
178        self.stalling_count.sub(stalled.stalled_count() as i64);
179        for (_, (_, mut requests, mut bulk)) in stalled.requests {
180            reject_write_requests(&mut requests, &mut bulk);
181        }
182    }
183
184    /// Rejects a specific region's stalled requests.
185    pub(crate) fn reject_region_stalled_requests(&mut self, region_id: &RegionId) {
186        debug!("Rejects stalled requests for region {}", region_id);
187        let (mut requests, mut bulk) = self.stalled_requests.remove(region_id);
188        self.stalling_count
189            .sub((requests.len() + bulk.len()) as i64);
190        reject_write_requests(&mut requests, &mut bulk);
191    }
192
193    /// Handles a specific region's stalled requests.
194    pub(crate) async fn handle_region_stalled_requests(&mut self, region_id: &RegionId) {
195        debug!("Handles stalled requests for region {}", region_id);
196        let (mut requests, mut bulk) = self.stalled_requests.remove(region_id);
197        self.stalling_count
198            .sub((requests.len() + bulk.len()) as i64);
199        self.handle_write_requests(&mut requests, &mut bulk, true)
200            .await;
201    }
202}
203
204impl<S> RegionWorkerLoop<S> {
205    /// Validates and groups requests by region.
206    fn prepare_region_write_ctx(
207        &mut self,
208        write_requests: &mut Vec<SenderWriteRequest>,
209        bulk_requests: &mut Vec<SenderBulkRequest>,
210    ) -> HashMap<RegionId, RegionWriteCtx> {
211        // Initialize region write context map.
212        let mut region_ctxs = HashMap::new();
213        self.process_write_requests(&mut region_ctxs, write_requests);
214        self.process_bulk_requests(&mut region_ctxs, bulk_requests);
215        region_ctxs
216    }
217
218    fn process_write_requests(
219        &mut self,
220        region_ctxs: &mut HashMap<RegionId, RegionWriteCtx>,
221        write_requests: &mut Vec<SenderWriteRequest>,
222    ) {
223        for mut sender_req in write_requests.drain(..) {
224            let region_id = sender_req.request.region_id;
225
226            // If region is waiting for alteration, add requests to pending writes.
227            if self.flush_scheduler.has_pending_ddls(region_id) {
228                // TODO(yingwen): consider adding some metrics for this.
229                // Safety: The region has pending ddls.
230                self.flush_scheduler
231                    .add_write_request_to_pending(sender_req);
232                continue;
233            }
234
235            // Checks whether the region exists and is it stalling.
236            if let hash_map::Entry::Vacant(e) = region_ctxs.entry(region_id) {
237                let Some(region) = self
238                    .regions
239                    .get_region_or(region_id, &mut sender_req.sender)
240                else {
241                    // No such region.
242                    continue;
243                };
244                match region.state() {
245                    RegionRoleState::Leader(RegionLeaderState::Writable) => {
246                        let region_ctx = RegionWriteCtx::new(
247                            region.region_id,
248                            &region.version_control,
249                            region.provider.clone(),
250                        );
251
252                        e.insert(region_ctx);
253                    }
254                    RegionRoleState::Leader(RegionLeaderState::Altering) => {
255                        debug!(
256                            "Region {} is altering, add request to pending writes",
257                            region.region_id
258                        );
259                        self.stalling_count.add(1);
260                        WRITE_STALL_TOTAL.inc();
261                        self.stalled_requests.push(sender_req);
262                        continue;
263                    }
264                    state => {
265                        // The region is not writable.
266                        sender_req.sender.send(
267                            RegionStateSnafu {
268                                region_id,
269                                state,
270                                expect: RegionRoleState::Leader(RegionLeaderState::Writable),
271                            }
272                            .fail(),
273                        );
274                        continue;
275                    }
276                }
277            }
278
279            // Safety: Now we ensure the region exists.
280            let region_ctx = region_ctxs.get_mut(&region_id).unwrap();
281
282            if let Err(e) = check_op_type(
283                region_ctx.version().options.append_mode,
284                &sender_req.request,
285            ) {
286                // Do not allow non-put op under append mode.
287                sender_req.sender.send(Err(e));
288
289                continue;
290            }
291
292            // Double check the request schema
293            let need_fill_missing_columns =
294                if let Some(ref region_metadata) = sender_req.request.region_metadata {
295                    region_ctx.version().metadata.schema_version != region_metadata.schema_version
296                } else {
297                    true
298                };
299            // Only fill missing columns if primary key is dense encoded.
300            if need_fill_missing_columns
301                && sender_req.request.primary_key_encoding() == PrimaryKeyEncoding::Dense
302            {
303                if let Err(e) = sender_req
304                    .request
305                    .maybe_fill_missing_columns(&region_ctx.version().metadata)
306                {
307                    sender_req.sender.send(Err(e));
308
309                    continue;
310                }
311            }
312
313            // Collect requests by region.
314            region_ctx.push_mutation(
315                sender_req.request.op_type as i32,
316                Some(sender_req.request.rows),
317                sender_req.request.hint,
318                sender_req.sender,
319            );
320        }
321    }
322
323    /// Processes bulk insert requests.
324    fn process_bulk_requests(
325        &mut self,
326        region_ctxs: &mut HashMap<RegionId, RegionWriteCtx>,
327        requests: &mut Vec<SenderBulkRequest>,
328    ) {
329        let _timer = metrics::REGION_WORKER_HANDLE_WRITE_ELAPSED
330            .with_label_values(&["prepare_bulk_request"])
331            .start_timer();
332        for mut bulk_req in requests.drain(..) {
333            let region_id = bulk_req.region_id;
334            // If region is waiting for alteration, add requests to pending writes.
335            if self.flush_scheduler.has_pending_ddls(region_id) {
336                // Safety: The region has pending ddls.
337                self.flush_scheduler.add_bulk_request_to_pending(bulk_req);
338                continue;
339            }
340
341            // Checks whether the region exists and is it stalling.
342            if let hash_map::Entry::Vacant(e) = region_ctxs.entry(region_id) {
343                let Some(region) = self.regions.get_region_or(region_id, &mut bulk_req.sender)
344                else {
345                    continue;
346                };
347                match region.state() {
348                    RegionRoleState::Leader(RegionLeaderState::Writable) => {
349                        let region_ctx = RegionWriteCtx::new(
350                            region.region_id,
351                            &region.version_control,
352                            region.provider.clone(),
353                        );
354
355                        e.insert(region_ctx);
356                    }
357                    RegionRoleState::Leader(RegionLeaderState::Altering) => {
358                        debug!(
359                            "Region {} is altering, add request to pending writes",
360                            region.region_id
361                        );
362                        self.stalling_count.add(1);
363                        WRITE_STALL_TOTAL.inc();
364                        self.stalled_requests.push_bulk(bulk_req);
365                        continue;
366                    }
367                    state => {
368                        // The region is not writable.
369                        bulk_req.sender.send(
370                            RegionStateSnafu {
371                                region_id,
372                                state,
373                                expect: RegionRoleState::Leader(RegionLeaderState::Writable),
374                            }
375                            .fail(),
376                        );
377                        continue;
378                    }
379                }
380            }
381
382            // Safety: Now we ensure the region exists.
383            let region_ctx = region_ctxs.get_mut(&region_id).unwrap();
384
385            // Double-check the request schema
386            let need_fill_missing_columns = region_ctx.version().metadata.schema_version
387                != bulk_req.region_metadata.schema_version;
388
389            // Only fill missing columns if primary key is dense encoded.
390            if need_fill_missing_columns {
391                // todo(hl): support filling default columns
392                bulk_req.sender.send(
393                    InvalidRequestSnafu {
394                        region_id,
395                        reason: "Schema mismatch",
396                    }
397                    .fail(),
398                );
399                return;
400            }
401
402            // Collect requests by region.
403            if !region_ctx.push_bulk(bulk_req.sender, bulk_req.request) {
404                return;
405            }
406        }
407    }
408
409    /// Returns true if the engine needs to reject some write requests.
410    pub(crate) fn should_reject_write(&self) -> bool {
411        // If memory usage reaches high threshold (we should also consider stalled requests) returns true.
412        self.write_buffer_manager.memory_usage() + self.stalled_requests.estimated_size
413            >= self.config.global_write_buffer_reject_size.as_bytes() as usize
414    }
415}
416
417/// Send rejected error to all `write_requests`.
418fn reject_write_requests(
419    write_requests: &mut Vec<SenderWriteRequest>,
420    bulk_requests: &mut Vec<SenderBulkRequest>,
421) {
422    WRITE_REJECT_TOTAL.inc_by(write_requests.len() as u64);
423
424    for req in write_requests.drain(..) {
425        req.sender.send(
426            RejectWriteSnafu {
427                region_id: req.request.region_id,
428            }
429            .fail(),
430        );
431    }
432    for req in bulk_requests.drain(..) {
433        let region_id = req.region_id;
434        req.sender.send(RejectWriteSnafu { region_id }.fail());
435    }
436}
437
438/// Rejects delete request under append mode.
439fn check_op_type(append_mode: bool, request: &WriteRequest) -> Result<()> {
440    if append_mode {
441        ensure!(
442            request.op_type == OpType::Put,
443            InvalidRequestSnafu {
444                region_id: request.region_id,
445                reason: "DELETE is not allowed under append mode",
446            }
447        );
448    }
449
450    Ok(())
451}