mito2/worker/
handle_write.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Handling write requests.
16
17use std::collections::{HashMap, hash_map};
18use std::sync::Arc;
19
20use api::v1::OpType;
21use common_telemetry::{debug, error};
22use common_wal::options::WalOptions;
23use snafu::ensure;
24use store_api::codec::PrimaryKeyEncoding;
25use store_api::logstore::LogStore;
26use store_api::storage::RegionId;
27
28use crate::error::{InvalidRequestSnafu, RegionStateSnafu, RejectWriteSnafu, Result};
29use crate::metrics;
30use crate::metrics::{
31    WRITE_REJECT_TOTAL, WRITE_ROWS_TOTAL, WRITE_STAGE_ELAPSED, WRITE_STALL_TOTAL,
32};
33use crate::region::{RegionLeaderState, RegionRoleState};
34use crate::region_write_ctx::RegionWriteCtx;
35use crate::request::{SenderBulkRequest, SenderWriteRequest, WriteRequest};
36use crate::worker::RegionWorkerLoop;
37
38impl<S: LogStore> RegionWorkerLoop<S> {
39    /// Takes and handles all write requests.
40    pub(crate) async fn handle_write_requests(
41        &mut self,
42        write_requests: &mut Vec<SenderWriteRequest>,
43        bulk_requests: &mut Vec<SenderBulkRequest>,
44        allow_stall: bool,
45    ) {
46        if write_requests.is_empty() && bulk_requests.is_empty() {
47            return;
48        }
49
50        // Flush this worker if the engine needs to flush.
51        self.maybe_flush_worker();
52
53        if self.should_reject_write() {
54            // The memory pressure is still too high, reject write requests.
55            reject_write_requests(write_requests, bulk_requests);
56            // Also reject all stalled requests.
57            self.reject_stalled_requests();
58            return;
59        }
60
61        if self.write_buffer_manager.should_stall() && allow_stall {
62            let stalled_count = (write_requests.len() + bulk_requests.len()) as i64;
63            self.stalling_count.add(stalled_count);
64            WRITE_STALL_TOTAL.inc_by(stalled_count as u64);
65            self.stalled_requests.append(write_requests, bulk_requests);
66            self.listener.on_write_stall();
67            return;
68        }
69
70        // Prepare write context.
71        let mut region_ctxs = {
72            let _timer = WRITE_STAGE_ELAPSED
73                .with_label_values(&["prepare_ctx"])
74                .start_timer();
75            self.prepare_region_write_ctx(write_requests, bulk_requests)
76        };
77
78        // Write to WAL.
79        {
80            let _timer = WRITE_STAGE_ELAPSED
81                .with_label_values(&["write_wal"])
82                .start_timer();
83            let mut wal_writer = self.wal.writer();
84            for region_ctx in region_ctxs.values_mut() {
85                if let WalOptions::Noop = &region_ctx.version().options.wal_options {
86                    // Skip wal write for noop region.
87                    continue;
88                }
89                if let Err(e) = region_ctx.add_wal_entry(&mut wal_writer).map_err(Arc::new) {
90                    region_ctx.set_error(e);
91                }
92            }
93            match wal_writer.write_to_wal().await.map_err(Arc::new) {
94                Ok(response) => {
95                    for (region_id, region_ctx) in region_ctxs.iter_mut() {
96                        if let WalOptions::Noop = &region_ctx.version().options.wal_options {
97                            continue;
98                        }
99
100                        // Safety: the log store implementation ensures that either the `write_to_wal` fails and no
101                        // response is returned or the last entry ids for each region do exist.
102                        let last_entry_id = response.last_entry_ids.get(region_id).unwrap();
103                        region_ctx.set_next_entry_id(last_entry_id + 1);
104                    }
105                }
106                Err(e) => {
107                    // Failed to write wal.
108                    for mut region_ctx in region_ctxs.into_values() {
109                        region_ctx.set_error(e.clone());
110                    }
111                    return;
112                }
113            }
114        }
115
116        let (mut put_rows, mut delete_rows) = (0, 0);
117        // Write to memtables.
118        {
119            let _timer = WRITE_STAGE_ELAPSED
120                .with_label_values(&["write_memtable"])
121                .start_timer();
122            if region_ctxs.len() == 1 {
123                // fast path for single region.
124                let mut region_ctx = region_ctxs.into_values().next().unwrap();
125                region_ctx.write_memtable().await;
126                region_ctx.write_bulk().await;
127                put_rows += region_ctx.put_num;
128                delete_rows += region_ctx.delete_num;
129            } else {
130                let region_write_task = region_ctxs
131                    .into_values()
132                    .map(|mut region_ctx| {
133                        // use tokio runtime to schedule tasks.
134                        common_runtime::spawn_global(async move {
135                            region_ctx.write_memtable().await;
136                            region_ctx.write_bulk().await;
137                            (region_ctx.put_num, region_ctx.delete_num)
138                        })
139                    })
140                    .collect::<Vec<_>>();
141
142                for result in futures::future::join_all(region_write_task).await {
143                    match result {
144                        Ok((put, delete)) => {
145                            put_rows += put;
146                            delete_rows += delete;
147                        }
148                        Err(e) => {
149                            error!(e; "unexpected error when joining region write tasks");
150                        }
151                    }
152                }
153            }
154        }
155        WRITE_ROWS_TOTAL
156            .with_label_values(&["put"])
157            .inc_by(put_rows as u64);
158        WRITE_ROWS_TOTAL
159            .with_label_values(&["delete"])
160            .inc_by(delete_rows as u64);
161    }
162
163    /// Handles all stalled write requests.
164    pub(crate) async fn handle_stalled_requests(&mut self) {
165        // Handle stalled requests.
166        let stalled = std::mem::take(&mut self.stalled_requests);
167        self.stalling_count.sub(stalled.stalled_count() as i64);
168        // We already stalled these requests, don't stall them again.
169        for (_, (_, mut requests, mut bulk)) in stalled.requests {
170            self.handle_write_requests(&mut requests, &mut bulk, false)
171                .await;
172        }
173    }
174
175    /// Rejects all stalled requests.
176    pub(crate) fn reject_stalled_requests(&mut self) {
177        let stalled = std::mem::take(&mut self.stalled_requests);
178        self.stalling_count.sub(stalled.stalled_count() as i64);
179        for (_, (_, mut requests, mut bulk)) in stalled.requests {
180            reject_write_requests(&mut requests, &mut bulk);
181        }
182    }
183
184    /// Rejects a specific region's stalled requests.
185    pub(crate) fn reject_region_stalled_requests(&mut self, region_id: &RegionId) {
186        debug!("Rejects stalled requests for region {}", region_id);
187        let (mut requests, mut bulk) = self.stalled_requests.remove(region_id);
188        self.stalling_count
189            .sub((requests.len() + bulk.len()) as i64);
190        reject_write_requests(&mut requests, &mut bulk);
191    }
192
193    /// Handles a specific region's stalled requests.
194    pub(crate) async fn handle_region_stalled_requests(&mut self, region_id: &RegionId) {
195        debug!("Handles stalled requests for region {}", region_id);
196        let (mut requests, mut bulk) = self.stalled_requests.remove(region_id);
197        self.stalling_count
198            .sub((requests.len() + bulk.len()) as i64);
199        self.handle_write_requests(&mut requests, &mut bulk, true)
200            .await;
201    }
202}
203
204impl<S> RegionWorkerLoop<S> {
205    /// Validates and groups requests by region.
206    fn prepare_region_write_ctx(
207        &mut self,
208        write_requests: &mut Vec<SenderWriteRequest>,
209        bulk_requests: &mut Vec<SenderBulkRequest>,
210    ) -> HashMap<RegionId, RegionWriteCtx> {
211        // Initialize region write context map.
212        let mut region_ctxs = HashMap::new();
213        self.process_write_requests(&mut region_ctxs, write_requests);
214        self.process_bulk_requests(&mut region_ctxs, bulk_requests);
215        region_ctxs
216    }
217
218    fn process_write_requests(
219        &mut self,
220        region_ctxs: &mut HashMap<RegionId, RegionWriteCtx>,
221        write_requests: &mut Vec<SenderWriteRequest>,
222    ) {
223        for mut sender_req in write_requests.drain(..) {
224            let region_id = sender_req.request.region_id;
225
226            // If region is waiting for alteration, add requests to pending writes.
227            if self.flush_scheduler.has_pending_ddls(region_id) {
228                // TODO(yingwen): consider adding some metrics for this.
229                // Safety: The region has pending ddls.
230                self.flush_scheduler
231                    .add_write_request_to_pending(sender_req);
232                continue;
233            }
234
235            // Checks whether the region exists and is it stalling.
236            if let hash_map::Entry::Vacant(e) = region_ctxs.entry(region_id) {
237                let Some(region) = self
238                    .regions
239                    .get_region_or(region_id, &mut sender_req.sender)
240                else {
241                    // No such region.
242                    continue;
243                };
244                match region.state() {
245                    RegionRoleState::Leader(RegionLeaderState::Writable)
246                    | RegionRoleState::Leader(RegionLeaderState::Staging) => {
247                        let region_ctx = RegionWriteCtx::new(
248                            region.region_id,
249                            &region.version_control,
250                            region.provider.clone(),
251                            Some(region.written_bytes.clone()),
252                        );
253
254                        e.insert(region_ctx);
255                    }
256                    RegionRoleState::Leader(RegionLeaderState::Altering) => {
257                        debug!(
258                            "Region {} is altering, add request to pending writes",
259                            region.region_id
260                        );
261                        self.stalling_count.add(1);
262                        WRITE_STALL_TOTAL.inc();
263                        self.stalled_requests.push(sender_req);
264                        continue;
265                    }
266                    state => {
267                        // The region is not writable.
268                        sender_req.sender.send(
269                            RegionStateSnafu {
270                                region_id,
271                                state,
272                                expect: RegionRoleState::Leader(RegionLeaderState::Writable),
273                            }
274                            .fail(),
275                        );
276                        continue;
277                    }
278                }
279            }
280
281            // Safety: Now we ensure the region exists.
282            let region_ctx = region_ctxs.get_mut(&region_id).unwrap();
283
284            if let Err(e) = check_op_type(
285                region_ctx.version().options.append_mode,
286                &sender_req.request,
287            ) {
288                // Do not allow non-put op under append mode.
289                sender_req.sender.send(Err(e));
290
291                continue;
292            }
293
294            // Double check the request schema
295            let need_fill_missing_columns =
296                if let Some(ref region_metadata) = sender_req.request.region_metadata {
297                    region_ctx.version().metadata.schema_version != region_metadata.schema_version
298                } else {
299                    true
300                };
301            // Only fill missing columns if primary key is dense encoded.
302            if need_fill_missing_columns
303                && sender_req.request.primary_key_encoding() == PrimaryKeyEncoding::Dense
304                && let Err(e) = sender_req
305                    .request
306                    .maybe_fill_missing_columns(&region_ctx.version().metadata)
307            {
308                sender_req.sender.send(Err(e));
309
310                continue;
311            }
312
313            // Collect requests by region.
314            region_ctx.push_mutation(
315                sender_req.request.op_type as i32,
316                Some(sender_req.request.rows),
317                sender_req.request.hint,
318                sender_req.sender,
319                None,
320            );
321        }
322    }
323
324    /// Processes bulk insert requests.
325    fn process_bulk_requests(
326        &mut self,
327        region_ctxs: &mut HashMap<RegionId, RegionWriteCtx>,
328        requests: &mut Vec<SenderBulkRequest>,
329    ) {
330        let _timer = metrics::REGION_WORKER_HANDLE_WRITE_ELAPSED
331            .with_label_values(&["prepare_bulk_request"])
332            .start_timer();
333        for mut bulk_req in requests.drain(..) {
334            let region_id = bulk_req.region_id;
335            // If region is waiting for alteration, add requests to pending writes.
336            if self.flush_scheduler.has_pending_ddls(region_id) {
337                // Safety: The region has pending ddls.
338                self.flush_scheduler.add_bulk_request_to_pending(bulk_req);
339                continue;
340            }
341
342            // Checks whether the region exists and is it stalling.
343            if let hash_map::Entry::Vacant(e) = region_ctxs.entry(region_id) {
344                let Some(region) = self.regions.get_region_or(region_id, &mut bulk_req.sender)
345                else {
346                    continue;
347                };
348                match region.state() {
349                    RegionRoleState::Leader(RegionLeaderState::Writable) => {
350                        let region_ctx = RegionWriteCtx::new(
351                            region.region_id,
352                            &region.version_control,
353                            region.provider.clone(),
354                            Some(region.written_bytes.clone()),
355                        );
356
357                        e.insert(region_ctx);
358                    }
359                    RegionRoleState::Leader(RegionLeaderState::Altering) => {
360                        debug!(
361                            "Region {} is altering, add request to pending writes",
362                            region.region_id
363                        );
364                        self.stalling_count.add(1);
365                        WRITE_STALL_TOTAL.inc();
366                        self.stalled_requests.push_bulk(bulk_req);
367                        continue;
368                    }
369                    state => {
370                        // The region is not writable.
371                        bulk_req.sender.send(
372                            RegionStateSnafu {
373                                region_id,
374                                state,
375                                expect: RegionRoleState::Leader(RegionLeaderState::Writable),
376                            }
377                            .fail(),
378                        );
379                        continue;
380                    }
381                }
382            }
383
384            // Safety: Now we ensure the region exists.
385            let region_ctx = region_ctxs.get_mut(&region_id).unwrap();
386
387            // Double-check the request schema
388            let need_fill_missing_columns = region_ctx.version().metadata.schema_version
389                != bulk_req.region_metadata.schema_version;
390
391            // Only fill missing columns if primary key is dense encoded.
392            if need_fill_missing_columns {
393                // todo(hl): support filling default columns
394                bulk_req.sender.send(
395                    InvalidRequestSnafu {
396                        region_id,
397                        reason: "Schema mismatch",
398                    }
399                    .fail(),
400                );
401                return;
402            }
403
404            // Collect requests by region.
405            if !region_ctx.push_bulk(bulk_req.sender, bulk_req.request, None) {
406                return;
407            }
408        }
409    }
410
411    /// Returns true if the engine needs to reject some write requests.
412    pub(crate) fn should_reject_write(&self) -> bool {
413        // If memory usage reaches high threshold (we should also consider stalled requests) returns true.
414        self.write_buffer_manager.memory_usage() + self.stalled_requests.estimated_size
415            >= self.config.global_write_buffer_reject_size.as_bytes() as usize
416    }
417}
418
419/// Send rejected error to all `write_requests`.
420fn reject_write_requests(
421    write_requests: &mut Vec<SenderWriteRequest>,
422    bulk_requests: &mut Vec<SenderBulkRequest>,
423) {
424    WRITE_REJECT_TOTAL.inc_by(write_requests.len() as u64);
425
426    for req in write_requests.drain(..) {
427        req.sender.send(
428            RejectWriteSnafu {
429                region_id: req.request.region_id,
430            }
431            .fail(),
432        );
433    }
434    for req in bulk_requests.drain(..) {
435        let region_id = req.region_id;
436        req.sender.send(RejectWriteSnafu { region_id }.fail());
437    }
438}
439
440/// Rejects delete request under append mode.
441fn check_op_type(append_mode: bool, request: &WriteRequest) -> Result<()> {
442    if append_mode {
443        ensure!(
444            request.op_type == OpType::Put,
445            InvalidRequestSnafu {
446                region_id: request.region_id,
447                reason: "DELETE is not allowed under append mode",
448            }
449        );
450    }
451
452    Ok(())
453}