mito2/worker/
handle_write.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Handling write requests.
16
17use std::collections::{HashMap, hash_map};
18use std::sync::Arc;
19
20use api::v1::OpType;
21use common_telemetry::{debug, error};
22use common_wal::options::WalOptions;
23use snafu::ensure;
24use store_api::codec::PrimaryKeyEncoding;
25use store_api::logstore::LogStore;
26use store_api::storage::RegionId;
27
28use crate::error::{
29    InvalidRequestSnafu, PartitionExprVersionMismatchSnafu, RegionStateSnafu, RejectWriteSnafu,
30    Result,
31};
32use crate::metrics;
33use crate::metrics::{
34    WRITE_REJECT_TOTAL, WRITE_ROWS_TOTAL, WRITE_STAGE_ELAPSED, WRITE_STALL_TOTAL,
35};
36use crate::region::{RegionLeaderState, RegionRoleState};
37use crate::region_write_ctx::RegionWriteCtx;
38use crate::request::{SenderBulkRequest, SenderWriteRequest, WriteRequest};
39use crate::worker::RegionWorkerLoop;
40
41impl<S: LogStore> RegionWorkerLoop<S> {
42    /// Takes and handles all write requests.
43    pub(crate) async fn handle_write_requests(
44        &mut self,
45        write_requests: &mut Vec<SenderWriteRequest>,
46        bulk_requests: &mut Vec<SenderBulkRequest>,
47        allow_stall: bool,
48    ) {
49        if write_requests.is_empty() && bulk_requests.is_empty() {
50            return;
51        }
52
53        // Flush this worker if the engine needs to flush.
54        self.maybe_flush_worker();
55
56        if self.should_reject_write() {
57            // The memory pressure is still too high, reject write requests.
58            reject_write_requests(write_requests, bulk_requests);
59            // Also reject all stalled requests.
60            self.reject_stalled_requests();
61            return;
62        }
63
64        if self.write_buffer_manager.should_stall() && allow_stall {
65            let stalled_count = (write_requests.len() + bulk_requests.len()) as i64;
66            self.stalling_count.add(stalled_count);
67            WRITE_STALL_TOTAL.inc_by(stalled_count as u64);
68            self.stalled_requests.append(write_requests, bulk_requests);
69            self.listener.on_write_stall();
70            return;
71        }
72
73        // Prepare write context.
74        let mut region_ctxs = {
75            let _timer = WRITE_STAGE_ELAPSED
76                .with_label_values(&["prepare_ctx"])
77                .start_timer();
78            self.prepare_region_write_ctx(write_requests, bulk_requests)
79        };
80
81        // Write to WAL.
82        {
83            let _timer = WRITE_STAGE_ELAPSED
84                .with_label_values(&["write_wal"])
85                .start_timer();
86            let mut wal_writer = self.wal.writer();
87            for region_ctx in region_ctxs.values_mut() {
88                if let WalOptions::Noop = &region_ctx.version().options.wal_options {
89                    // Skip wal write for noop region.
90                    continue;
91                }
92                if let Err(e) = region_ctx.add_wal_entry(&mut wal_writer).map_err(Arc::new) {
93                    region_ctx.set_error(e);
94                }
95            }
96            match wal_writer.write_to_wal().await.map_err(Arc::new) {
97                Ok(response) => {
98                    for (region_id, region_ctx) in region_ctxs.iter_mut() {
99                        if let WalOptions::Noop = &region_ctx.version().options.wal_options {
100                            continue;
101                        }
102
103                        // Safety: the log store implementation ensures that either the `write_to_wal` fails and no
104                        // response is returned or the last entry ids for each region do exist.
105                        let last_entry_id = response.last_entry_ids.get(region_id).unwrap();
106                        region_ctx.set_next_entry_id(last_entry_id + 1);
107                    }
108                }
109                Err(e) => {
110                    // Failed to write wal.
111                    for mut region_ctx in region_ctxs.into_values() {
112                        region_ctx.set_error(e.clone());
113                    }
114                    return;
115                }
116            }
117        }
118
119        let (mut put_rows, mut delete_rows) = (0, 0);
120        // Write to memtables.
121        {
122            let _timer = WRITE_STAGE_ELAPSED
123                .with_label_values(&["write_memtable"])
124                .start_timer();
125            if region_ctxs.len() == 1 {
126                // fast path for single region.
127                let mut region_ctx = region_ctxs.into_values().next().unwrap();
128                region_ctx.write_memtable().await;
129                region_ctx.write_bulk().await;
130                put_rows += region_ctx.put_num;
131                delete_rows += region_ctx.delete_num;
132            } else {
133                let region_write_task = region_ctxs
134                    .into_values()
135                    .map(|mut region_ctx| {
136                        // use tokio runtime to schedule tasks.
137                        common_runtime::spawn_global(async move {
138                            region_ctx.write_memtable().await;
139                            region_ctx.write_bulk().await;
140                            (region_ctx.put_num, region_ctx.delete_num)
141                        })
142                    })
143                    .collect::<Vec<_>>();
144
145                for result in futures::future::join_all(region_write_task).await {
146                    match result {
147                        Ok((put, delete)) => {
148                            put_rows += put;
149                            delete_rows += delete;
150                        }
151                        Err(e) => {
152                            error!(e; "unexpected error when joining region write tasks");
153                        }
154                    }
155                }
156            }
157        }
158        WRITE_ROWS_TOTAL
159            .with_label_values(&["put"])
160            .inc_by(put_rows as u64);
161        WRITE_ROWS_TOTAL
162            .with_label_values(&["delete"])
163            .inc_by(delete_rows as u64);
164    }
165
166    /// Handles all stalled write requests.
167    pub(crate) async fn handle_stalled_requests(&mut self) {
168        // Handle stalled requests.
169        let stalled = std::mem::take(&mut self.stalled_requests);
170        self.stalling_count.sub(stalled.stalled_count() as i64);
171        // We already stalled these requests, don't stall them again.
172        for (_, (_, mut requests, mut bulk)) in stalled.requests {
173            self.handle_write_requests(&mut requests, &mut bulk, false)
174                .await;
175        }
176    }
177
178    /// Rejects all stalled requests.
179    pub(crate) fn reject_stalled_requests(&mut self) {
180        let stalled = std::mem::take(&mut self.stalled_requests);
181        self.stalling_count.sub(stalled.stalled_count() as i64);
182        for (_, (_, mut requests, mut bulk)) in stalled.requests {
183            reject_write_requests(&mut requests, &mut bulk);
184        }
185    }
186
187    /// Rejects a specific region's stalled requests.
188    pub(crate) fn reject_region_stalled_requests(&mut self, region_id: &RegionId) {
189        debug!("Rejects stalled requests for region {}", region_id);
190        let (mut requests, mut bulk) = self.stalled_requests.remove(region_id);
191        self.stalling_count
192            .sub((requests.len() + bulk.len()) as i64);
193        reject_write_requests(&mut requests, &mut bulk);
194    }
195
196    /// Handles a specific region's stalled requests.
197    pub(crate) async fn handle_region_stalled_requests(&mut self, region_id: &RegionId) {
198        debug!("Handles stalled requests for region {}", region_id);
199        let (mut requests, mut bulk) = self.stalled_requests.remove(region_id);
200        self.stalling_count
201            .sub((requests.len() + bulk.len()) as i64);
202        self.handle_write_requests(&mut requests, &mut bulk, true)
203            .await;
204    }
205}
206
207impl<S> RegionWorkerLoop<S> {
208    /// Validates and groups requests by region.
209    fn prepare_region_write_ctx(
210        &mut self,
211        write_requests: &mut Vec<SenderWriteRequest>,
212        bulk_requests: &mut Vec<SenderBulkRequest>,
213    ) -> HashMap<RegionId, RegionWriteCtx> {
214        // Initialize region write context map.
215        let mut region_ctxs = HashMap::new();
216        self.process_write_requests(&mut region_ctxs, write_requests);
217        self.process_bulk_requests(&mut region_ctxs, bulk_requests);
218        region_ctxs
219    }
220
221    fn process_write_requests(
222        &mut self,
223        region_ctxs: &mut HashMap<RegionId, RegionWriteCtx>,
224        write_requests: &mut Vec<SenderWriteRequest>,
225    ) {
226        for mut sender_req in write_requests.drain(..) {
227            let region_id = sender_req.request.region_id;
228
229            // If region is waiting for alteration, add requests to pending writes.
230            if self.flush_scheduler.has_pending_ddls(region_id) {
231                // TODO(yingwen): consider adding some metrics for this.
232                // Safety: The region has pending ddls.
233                self.flush_scheduler
234                    .add_write_request_to_pending(sender_req);
235                continue;
236            }
237
238            // Checks whether the region exists and is it stalling.
239            if let hash_map::Entry::Vacant(e) = region_ctxs.entry(region_id) {
240                let Some(region) = self
241                    .regions
242                    .get_region_or(region_id, &mut sender_req.sender)
243                else {
244                    // No such region.
245                    continue;
246                };
247                #[cfg(test)]
248                debug!(
249                    "Handling write request for region {}, state: {:?}",
250                    region_id,
251                    region.state()
252                );
253                match region.state() {
254                    RegionRoleState::Leader(RegionLeaderState::Writable)
255                    | RegionRoleState::Leader(RegionLeaderState::Staging) => {
256                        if region.reject_all_writes_in_staging() {
257                            sender_req
258                                .sender
259                                .send(RejectWriteSnafu { region_id }.fail());
260                            continue;
261                        }
262
263                        let region_ctx = RegionWriteCtx::new(
264                            region.region_id,
265                            &region.version_control,
266                            region.provider.clone(),
267                            Some(region.written_bytes.clone()),
268                        );
269
270                        e.insert(region_ctx);
271                    }
272                    RegionRoleState::Leader(RegionLeaderState::Altering) => {
273                        debug!(
274                            "Region {} is altering, add request to pending writes",
275                            region.region_id
276                        );
277                        self.stalling_count.add(1);
278                        WRITE_STALL_TOTAL.inc();
279                        self.stalled_requests.push(sender_req);
280                        continue;
281                    }
282                    RegionRoleState::Leader(RegionLeaderState::EnteringStaging) => {
283                        debug!(
284                            "Region {} is entering staging, add request to pending writes",
285                            region.region_id
286                        );
287                        self.stalling_count.add(1);
288                        WRITE_STALL_TOTAL.inc();
289                        self.stalled_requests.push(sender_req);
290                        continue;
291                    }
292                    state => {
293                        // The region is not writable.
294                        sender_req.sender.send(
295                            RegionStateSnafu {
296                                region_id,
297                                state,
298                                expect: RegionRoleState::Leader(RegionLeaderState::Writable),
299                            }
300                            .fail(),
301                        );
302                        continue;
303                    }
304                }
305            }
306
307            // Safety: Now we ensure the region exists.
308            let region_ctx = region_ctxs.get_mut(&region_id).unwrap();
309            let Some(region) = self
310                .regions
311                .get_region_or(region_id, &mut sender_req.sender)
312            else {
313                continue;
314            };
315            if region.reject_all_writes_in_staging() {
316                sender_req
317                    .sender
318                    .send(RejectWriteSnafu { region_id }.fail());
319                continue;
320            }
321            let expected_version = region.expected_partition_expr_version();
322            if let Err(e) = check_partition_expr_version(
323                region_id,
324                expected_version,
325                sender_req.request.partition_expr_version,
326            ) {
327                sender_req.sender.send(Err(e));
328                continue;
329            }
330
331            if let Err(e) = check_op_type(
332                region_ctx.version().options.append_mode,
333                &sender_req.request,
334            ) {
335                // Do not allow non-put op under append mode.
336                sender_req.sender.send(Err(e));
337
338                continue;
339            }
340
341            // Double check the request schema
342            let need_fill_missing_columns =
343                if let Some(ref region_metadata) = sender_req.request.region_metadata {
344                    region_ctx.version().metadata.schema_version != region_metadata.schema_version
345                } else {
346                    true
347                };
348            // Only fill missing columns if primary key is dense encoded.
349            if need_fill_missing_columns
350                && sender_req.request.primary_key_encoding() == PrimaryKeyEncoding::Dense
351                && let Err(e) = sender_req
352                    .request
353                    .maybe_fill_missing_columns(&region_ctx.version().metadata)
354            {
355                sender_req.sender.send(Err(e));
356
357                continue;
358            }
359
360            // Collect requests by region.
361            region_ctx.push_mutation(
362                sender_req.request.op_type as i32,
363                Some(sender_req.request.rows),
364                sender_req.request.hint,
365                sender_req.sender,
366                None,
367            );
368        }
369    }
370
371    /// Processes bulk insert requests.
372    fn process_bulk_requests(
373        &mut self,
374        region_ctxs: &mut HashMap<RegionId, RegionWriteCtx>,
375        requests: &mut Vec<SenderBulkRequest>,
376    ) {
377        let _timer = metrics::REGION_WORKER_HANDLE_WRITE_ELAPSED
378            .with_label_values(&["prepare_bulk_request"])
379            .start_timer();
380        for mut bulk_req in requests.drain(..) {
381            let region_id = bulk_req.region_id;
382            // If region is waiting for alteration, add requests to pending writes.
383            if self.flush_scheduler.has_pending_ddls(region_id) {
384                // Safety: The region has pending ddls.
385                self.flush_scheduler.add_bulk_request_to_pending(bulk_req);
386                continue;
387            }
388
389            // Checks whether the region exists and is it stalling.
390            if let hash_map::Entry::Vacant(e) = region_ctxs.entry(region_id) {
391                let Some(region) = self.regions.get_region_or(region_id, &mut bulk_req.sender)
392                else {
393                    continue;
394                };
395                match region.state() {
396                    RegionRoleState::Leader(RegionLeaderState::Writable)
397                    | RegionRoleState::Leader(RegionLeaderState::Staging) => {
398                        if region.reject_all_writes_in_staging() {
399                            bulk_req.sender.send(RejectWriteSnafu { region_id }.fail());
400                            continue;
401                        }
402                        let region_ctx = RegionWriteCtx::new(
403                            region.region_id,
404                            &region.version_control,
405                            region.provider.clone(),
406                            Some(region.written_bytes.clone()),
407                        );
408
409                        e.insert(region_ctx);
410                    }
411                    RegionRoleState::Leader(RegionLeaderState::Altering) => {
412                        debug!(
413                            "Region {} is altering, add request to pending writes",
414                            region.region_id
415                        );
416                        self.stalling_count.add(1);
417                        WRITE_STALL_TOTAL.inc();
418                        self.stalled_requests.push_bulk(bulk_req);
419                        continue;
420                    }
421                    state => {
422                        // The region is not writable.
423                        bulk_req.sender.send(
424                            RegionStateSnafu {
425                                region_id,
426                                state,
427                                expect: RegionRoleState::Leader(RegionLeaderState::Writable),
428                            }
429                            .fail(),
430                        );
431                        continue;
432                    }
433                }
434            }
435
436            // Safety: Now we ensure the region exists.
437            let region_ctx = region_ctxs.get_mut(&region_id).unwrap();
438            let Some(region) = self.regions.get_region_or(region_id, &mut bulk_req.sender) else {
439                continue;
440            };
441            if region.reject_all_writes_in_staging() {
442                bulk_req.sender.send(RejectWriteSnafu { region_id }.fail());
443                continue;
444            }
445            let expected_version = region.expected_partition_expr_version();
446            if let Err(e) = check_partition_expr_version(
447                region_id,
448                expected_version,
449                bulk_req.partition_expr_version,
450            ) {
451                bulk_req.sender.send(Err(e));
452                continue;
453            }
454
455            // Double-check the request schema
456            let need_fill_missing_columns = region_ctx.version().metadata.schema_version
457                != bulk_req.region_metadata.schema_version;
458
459            // Fill missing columns if needed
460            if need_fill_missing_columns
461                && let Err(e) = bulk_req
462                    .request
463                    .fill_missing_columns(&region_ctx.version().metadata)
464            {
465                bulk_req.sender.send(Err(e));
466                continue;
467            }
468
469            // Collect requests by region.
470            if !region_ctx.push_bulk(bulk_req.sender, bulk_req.request, None) {
471                return;
472            }
473        }
474    }
475
476    /// Returns true if the engine needs to reject some write requests.
477    pub(crate) fn should_reject_write(&self) -> bool {
478        // If memory usage reaches high threshold (we should also consider stalled requests) returns true.
479        self.write_buffer_manager.memory_usage() + self.stalled_requests.estimated_size
480            >= self.config.global_write_buffer_reject_size.as_bytes() as usize
481    }
482}
483
484/// Send rejected error to all `write_requests`.
485fn reject_write_requests(
486    write_requests: &mut Vec<SenderWriteRequest>,
487    bulk_requests: &mut Vec<SenderBulkRequest>,
488) {
489    WRITE_REJECT_TOTAL.inc_by(write_requests.len() as u64);
490
491    for req in write_requests.drain(..) {
492        req.sender.send(
493            RejectWriteSnafu {
494                region_id: req.request.region_id,
495            }
496            .fail(),
497        );
498    }
499    for req in bulk_requests.drain(..) {
500        let region_id = req.region_id;
501        req.sender.send(RejectWriteSnafu { region_id }.fail());
502    }
503}
504
505/// Rejects delete request under append mode.
506fn check_op_type(append_mode: bool, request: &WriteRequest) -> Result<()> {
507    if append_mode {
508        ensure!(
509            request.op_type == OpType::Put,
510            InvalidRequestSnafu {
511                region_id: request.region_id,
512                reason: "DELETE is not allowed under append mode",
513            }
514        );
515    }
516
517    Ok(())
518}
519
520fn check_partition_expr_version(
521    region_id: RegionId,
522    expected_version: u64,
523    request_version: Option<u64>,
524) -> Result<()> {
525    let request_version = match request_version {
526        None => return Ok(()),
527        Some(value) => value,
528    };
529    if request_version != expected_version {
530        return PartitionExprVersionMismatchSnafu {
531            region_id,
532            request_version,
533            expected_version,
534        }
535        .fail();
536    }
537    Ok(())
538}