Skip to main content

mito2/worker/
handle_write.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Handling write requests.
16
17use std::collections::{HashMap, hash_map};
18use std::sync::Arc;
19
20use api::v1::OpType;
21use common_telemetry::{debug, error};
22use common_wal::options::WalOptions;
23use snafu::ensure;
24use store_api::codec::PrimaryKeyEncoding;
25use store_api::logstore::LogStore;
26use store_api::storage::RegionId;
27
28use crate::error::{
29    InvalidRequestSnafu, PartitionExprVersionMismatchSnafu, RegionNotFoundSnafu, RegionStateSnafu,
30    RejectWriteSnafu, Result,
31};
32use crate::metrics;
33use crate::metrics::{
34    WRITE_REJECT_TOTAL, WRITE_ROWS_TOTAL, WRITE_STAGE_ELAPSED, WRITE_STALL_TOTAL,
35};
36use crate::region::{RegionLeaderState, RegionRoleState};
37use crate::region_write_ctx::RegionWriteCtx;
38use crate::request::{SenderBulkRequest, SenderWriteRequest, WriteRequest};
39use crate::worker::RegionWorkerLoop;
40
41impl<S: LogStore> RegionWorkerLoop<S> {
42    /// Takes and handles all write requests.
43    pub(crate) async fn handle_write_requests(
44        &mut self,
45        write_requests: &mut Vec<SenderWriteRequest>,
46        bulk_requests: &mut Vec<SenderBulkRequest>,
47        allow_stall: bool,
48    ) {
49        if write_requests.is_empty() && bulk_requests.is_empty() {
50            return;
51        }
52
53        // Flush this worker if the engine needs to flush.
54        self.maybe_flush_worker();
55
56        if self.should_reject_write() {
57            // The memory pressure is still too high, reject write requests.
58            reject_write_requests(write_requests, bulk_requests);
59            // Also reject all stalled requests.
60            self.reject_stalled_requests();
61            return;
62        }
63
64        if self.write_buffer_manager.should_stall() && allow_stall {
65            let stalled_count = (write_requests.len() + bulk_requests.len()) as i64;
66            self.stalling_count.add(stalled_count);
67            WRITE_STALL_TOTAL.inc_by(stalled_count as u64);
68            self.stalled_requests.append(write_requests, bulk_requests);
69            self.listener.on_write_stall();
70            return;
71        }
72
73        // Prepare write context.
74        let mut region_ctxs = {
75            let _timer = WRITE_STAGE_ELAPSED
76                .with_label_values(&["prepare_ctx"])
77                .start_timer();
78            self.prepare_region_write_ctx(write_requests, bulk_requests)
79        };
80
81        // Write to WAL.
82        {
83            let _timer = WRITE_STAGE_ELAPSED
84                .with_label_values(&["write_wal"])
85                .start_timer();
86            let mut wal_writer = self.wal.writer();
87            for region_ctx in region_ctxs.values_mut() {
88                if let WalOptions::Noop = &region_ctx.version().options.wal_options {
89                    // Skip wal write for noop region.
90                    continue;
91                }
92                if let Err(e) = region_ctx.add_wal_entry(&mut wal_writer).map_err(Arc::new) {
93                    region_ctx.set_error(e);
94                }
95            }
96            match wal_writer.write_to_wal().await.map_err(Arc::new) {
97                Ok(response) => {
98                    for (region_id, region_ctx) in region_ctxs.iter_mut() {
99                        if let WalOptions::Noop = &region_ctx.version().options.wal_options {
100                            continue;
101                        }
102
103                        // Safety: the log store implementation ensures that either the `write_to_wal` fails and no
104                        // response is returned or the last entry ids for each region do exist.
105                        let last_entry_id = response.last_entry_ids.get(region_id).unwrap();
106                        region_ctx.set_next_entry_id(last_entry_id + 1);
107                    }
108                }
109                Err(e) => {
110                    // Failed to write wal.
111                    for mut region_ctx in region_ctxs.into_values() {
112                        region_ctx.set_error(e.clone());
113                    }
114                    return;
115                }
116            }
117        }
118
119        let (mut put_rows, mut delete_rows) = (0, 0);
120        // Write to memtables.
121        {
122            let _timer = WRITE_STAGE_ELAPSED
123                .with_label_values(&["write_memtable"])
124                .start_timer();
125            if region_ctxs.len() == 1 {
126                // fast path for single region.
127                let mut region_ctx = region_ctxs.into_values().next().unwrap();
128                region_ctx.write_memtable().await;
129                region_ctx.write_bulk().await;
130                put_rows += region_ctx.put_num;
131                delete_rows += region_ctx.delete_num;
132            } else {
133                let region_write_task = region_ctxs
134                    .into_values()
135                    .map(|mut region_ctx| {
136                        // use tokio runtime to schedule tasks.
137                        common_runtime::spawn_global(async move {
138                            region_ctx.write_memtable().await;
139                            region_ctx.write_bulk().await;
140                            (region_ctx.put_num, region_ctx.delete_num)
141                        })
142                    })
143                    .collect::<Vec<_>>();
144
145                for result in futures::future::join_all(region_write_task).await {
146                    match result {
147                        Ok((put, delete)) => {
148                            put_rows += put;
149                            delete_rows += delete;
150                        }
151                        Err(e) => {
152                            error!(e; "unexpected error when joining region write tasks");
153                        }
154                    }
155                }
156            }
157        }
158        WRITE_ROWS_TOTAL
159            .with_label_values(&["put"])
160            .inc_by(put_rows as u64);
161        WRITE_ROWS_TOTAL
162            .with_label_values(&["delete"])
163            .inc_by(delete_rows as u64);
164    }
165
166    /// Handles all stalled write requests.
167    pub(crate) async fn handle_stalled_requests(&mut self) {
168        // Handle stalled requests.
169        let stalled = std::mem::take(&mut self.stalled_requests);
170        self.stalling_count.sub(stalled.stalled_count() as i64);
171        // We already stalled these requests, don't stall them again.
172        for (_, (_, mut requests, mut bulk)) in stalled.requests {
173            self.handle_write_requests(&mut requests, &mut bulk, false)
174                .await;
175        }
176    }
177
178    /// Rejects all stalled requests.
179    pub(crate) fn reject_stalled_requests(&mut self) {
180        let stalled = std::mem::take(&mut self.stalled_requests);
181        self.stalling_count.sub(stalled.stalled_count() as i64);
182        for (_, (_, mut requests, mut bulk)) in stalled.requests {
183            reject_write_requests(&mut requests, &mut bulk);
184        }
185    }
186
187    /// Rejects a specific region's stalled requests.
188    pub(crate) fn reject_region_stalled_requests(&mut self, region_id: &RegionId) {
189        debug!("Rejects stalled requests for region {}", region_id);
190        let (mut requests, mut bulk) = self.stalled_requests.remove(region_id);
191        self.stalling_count
192            .sub((requests.len() + bulk.len()) as i64);
193        reject_write_requests(&mut requests, &mut bulk);
194    }
195
196    /// Fails a specific region's stalled requests if the region no longer exists.
197    pub(crate) fn fail_region_stalled_requests_as_not_found(&mut self, region_id: &RegionId) {
198        debug!(
199            "Fails stalled requests for region {} as region not found",
200            region_id
201        );
202        let (requests, bulk) = self.stalled_requests.remove(region_id);
203        self.stalling_count
204            .sub((requests.len() + bulk.len()) as i64);
205
206        for req in requests {
207            req.sender.send(
208                RegionNotFoundSnafu {
209                    region_id: req.request.region_id,
210                }
211                .fail(),
212            );
213        }
214        for req in bulk {
215            req.sender.send(
216                RegionNotFoundSnafu {
217                    region_id: req.region_id,
218                }
219                .fail(),
220            );
221        }
222    }
223
224    /// Handles a specific region's stalled requests.
225    ///
226    /// `allow_stall` should be false for backpressure retry paths to avoid stalling the same
227    /// requests again. It should remain true for non-backpressure retries, such as requests stalled
228    /// by alter, staging, and region editing. Global reject backpressure still applies before the
229    /// stall check.
230    pub(crate) async fn handle_region_stalled_requests(
231        &mut self,
232        region_id: &RegionId,
233        allow_stall: bool,
234    ) {
235        debug!("Handles stalled requests for region {}", region_id);
236        let (mut requests, mut bulk) = self.stalled_requests.remove(region_id);
237        self.stalling_count
238            .sub((requests.len() + bulk.len()) as i64);
239        self.handle_write_requests(&mut requests, &mut bulk, allow_stall)
240            .await;
241    }
242
243    /// Processes same-batch writes for a region before handling its edit-completion notification.
244    ///
245    /// The worker dispatch loop handles background notifications before the current batch's write
246    /// buffer. Without this step, writes that arrived during edit N could be classified only after
247    /// edit N+1 is started, placing them behind that next edit.
248    pub(crate) async fn handle_buffered_region_write_requests(
249        &mut self,
250        region_id: &RegionId,
251        write_requests: &mut Vec<SenderWriteRequest>,
252        bulk_requests: &mut Vec<SenderBulkRequest>,
253    ) {
254        let mut current_region_write_requests = write_requests
255            .extract_if(.., |r| r.request.region_id == *region_id)
256            .collect::<Vec<_>>();
257
258        let mut current_region_bulk_requests = bulk_requests
259            .extract_if(.., |r| r.region_id == *region_id)
260            .collect::<Vec<_>>();
261
262        self.handle_write_requests(
263            &mut current_region_write_requests,
264            &mut current_region_bulk_requests,
265            true,
266        )
267        .await;
268    }
269}
270
271impl<S> RegionWorkerLoop<S> {
272    /// Validates and groups requests by region.
273    fn prepare_region_write_ctx(
274        &mut self,
275        write_requests: &mut Vec<SenderWriteRequest>,
276        bulk_requests: &mut Vec<SenderBulkRequest>,
277    ) -> HashMap<RegionId, RegionWriteCtx> {
278        // Initialize region write context map.
279        let mut region_ctxs = HashMap::new();
280        self.process_write_requests(&mut region_ctxs, write_requests);
281        self.process_bulk_requests(&mut region_ctxs, bulk_requests);
282        region_ctxs
283    }
284
285    fn process_write_requests(
286        &mut self,
287        region_ctxs: &mut HashMap<RegionId, RegionWriteCtx>,
288        write_requests: &mut Vec<SenderWriteRequest>,
289    ) {
290        for mut sender_req in write_requests.drain(..) {
291            let region_id = sender_req.request.region_id;
292
293            // If region is waiting for alteration, add requests to pending writes.
294            if self.flush_scheduler.has_pending_ddls(region_id) {
295                // TODO(yingwen): consider adding some metrics for this.
296                // Safety: The region has pending ddls.
297                self.flush_scheduler
298                    .add_write_request_to_pending(sender_req);
299                continue;
300            }
301
302            // Checks whether the region exists and is it stalling.
303            if let hash_map::Entry::Vacant(e) = region_ctxs.entry(region_id) {
304                let Some(region) = self
305                    .regions
306                    .get_region_or(region_id, &mut sender_req.sender)
307                else {
308                    // No such region.
309                    continue;
310                };
311                #[cfg(test)]
312                debug!(
313                    "Handling write request for region {}, state: {:?}",
314                    region_id,
315                    region.state()
316                );
317                match region.state() {
318                    RegionRoleState::Leader(RegionLeaderState::Writable)
319                    | RegionRoleState::Leader(RegionLeaderState::Staging) => {
320                        if region.reject_all_writes_in_staging() {
321                            sender_req
322                                .sender
323                                .send(RejectWriteSnafu { region_id }.fail());
324                            continue;
325                        }
326
327                        let region_ctx = RegionWriteCtx::new(
328                            region.region_id,
329                            &region.version_control,
330                            region.provider.clone(),
331                            Some(region.written_bytes.clone()),
332                        );
333
334                        e.insert(region_ctx);
335                    }
336                    RegionRoleState::Leader(RegionLeaderState::Altering)
337                    | RegionRoleState::Leader(RegionLeaderState::Editing) => {
338                        // Editing is transient: queue the write so edit completion can drain it
339                        // before starting the next queued edit.
340                        debug!(
341                            "Region {} is {:?}, add request to pending writes",
342                            region.region_id,
343                            region.state()
344                        );
345                        self.stalling_count.add(1);
346                        WRITE_STALL_TOTAL.inc();
347                        self.stalled_requests.push(sender_req);
348                        continue;
349                    }
350                    RegionRoleState::Leader(RegionLeaderState::EnteringStaging) => {
351                        debug!(
352                            "Region {} is entering staging, add request to pending writes",
353                            region.region_id
354                        );
355                        self.stalling_count.add(1);
356                        WRITE_STALL_TOTAL.inc();
357                        self.stalled_requests.push(sender_req);
358                        continue;
359                    }
360                    state => {
361                        // The region is not writable.
362                        sender_req.sender.send(
363                            RegionStateSnafu {
364                                region_id,
365                                state,
366                                expect: RegionRoleState::Leader(RegionLeaderState::Writable),
367                            }
368                            .fail(),
369                        );
370                        continue;
371                    }
372                }
373            }
374
375            // Safety: Now we ensure the region exists.
376            let region_ctx = region_ctxs.get_mut(&region_id).unwrap();
377            let Some(region) = self
378                .regions
379                .get_region_or(region_id, &mut sender_req.sender)
380            else {
381                continue;
382            };
383            if region.reject_all_writes_in_staging() {
384                sender_req
385                    .sender
386                    .send(RejectWriteSnafu { region_id }.fail());
387                continue;
388            }
389            let expected_version = region.expected_partition_expr_version();
390            if let Err(e) = check_partition_expr_version(
391                region_id,
392                expected_version,
393                sender_req.request.partition_expr_version,
394            ) {
395                sender_req.sender.send(Err(e));
396                continue;
397            }
398
399            if let Err(e) = check_op_type(
400                region_ctx.version().options.append_mode,
401                &sender_req.request,
402            ) {
403                // Do not allow non-put op under append mode.
404                sender_req.sender.send(Err(e));
405
406                continue;
407            }
408
409            // Double check the request schema
410            let need_fill_missing_columns =
411                if let Some(ref region_metadata) = sender_req.request.region_metadata {
412                    region_ctx.version().metadata.schema_version != region_metadata.schema_version
413                } else {
414                    true
415                };
416            // Only fill missing columns if primary key is dense encoded.
417            if need_fill_missing_columns
418                && sender_req.request.primary_key_encoding() == PrimaryKeyEncoding::Dense
419                && let Err(e) = sender_req
420                    .request
421                    .maybe_fill_missing_columns(&region_ctx.version().metadata)
422            {
423                sender_req.sender.send(Err(e));
424
425                continue;
426            }
427
428            // Collect requests by region.
429            region_ctx.push_mutation(
430                sender_req.request.op_type as i32,
431                Some(sender_req.request.rows),
432                sender_req.request.hint,
433                sender_req.sender,
434                None,
435            );
436        }
437    }
438
439    /// Processes bulk insert requests.
440    fn process_bulk_requests(
441        &mut self,
442        region_ctxs: &mut HashMap<RegionId, RegionWriteCtx>,
443        requests: &mut Vec<SenderBulkRequest>,
444    ) {
445        let _timer = metrics::REGION_WORKER_HANDLE_WRITE_ELAPSED
446            .with_label_values(&["prepare_bulk_request"])
447            .start_timer();
448        for mut bulk_req in requests.drain(..) {
449            let region_id = bulk_req.region_id;
450            // If region is waiting for alteration, add requests to pending writes.
451            if self.flush_scheduler.has_pending_ddls(region_id) {
452                // Safety: The region has pending ddls.
453                self.flush_scheduler.add_bulk_request_to_pending(bulk_req);
454                continue;
455            }
456
457            // Checks whether the region exists and is it stalling.
458            if let hash_map::Entry::Vacant(e) = region_ctxs.entry(region_id) {
459                let Some(region) = self.regions.get_region_or(region_id, &mut bulk_req.sender)
460                else {
461                    continue;
462                };
463                match region.state() {
464                    RegionRoleState::Leader(RegionLeaderState::Writable)
465                    | RegionRoleState::Leader(RegionLeaderState::Staging) => {
466                        if region.reject_all_writes_in_staging() {
467                            bulk_req.sender.send(RejectWriteSnafu { region_id }.fail());
468                            continue;
469                        }
470                        let region_ctx = RegionWriteCtx::new(
471                            region.region_id,
472                            &region.version_control,
473                            region.provider.clone(),
474                            Some(region.written_bytes.clone()),
475                        );
476
477                        e.insert(region_ctx);
478                    }
479                    RegionRoleState::Leader(RegionLeaderState::Altering)
480                    | RegionRoleState::Leader(RegionLeaderState::Editing) => {
481                        // Editing is transient: queue the bulk write so edit completion can drain
482                        // it before starting the next queued edit.
483                        debug!(
484                            "Region {} is {:?}, add request to pending writes",
485                            region.region_id,
486                            region.state()
487                        );
488                        self.stalling_count.add(1);
489                        WRITE_STALL_TOTAL.inc();
490                        self.stalled_requests.push_bulk(bulk_req);
491                        continue;
492                    }
493                    state => {
494                        // The region is not writable.
495                        bulk_req.sender.send(
496                            RegionStateSnafu {
497                                region_id,
498                                state,
499                                expect: RegionRoleState::Leader(RegionLeaderState::Writable),
500                            }
501                            .fail(),
502                        );
503                        continue;
504                    }
505                }
506            }
507
508            // Safety: Now we ensure the region exists.
509            let region_ctx = region_ctxs.get_mut(&region_id).unwrap();
510            let Some(region) = self.regions.get_region_or(region_id, &mut bulk_req.sender) else {
511                continue;
512            };
513            if region.reject_all_writes_in_staging() {
514                bulk_req.sender.send(RejectWriteSnafu { region_id }.fail());
515                continue;
516            }
517            let expected_version = region.expected_partition_expr_version();
518            if let Err(e) = check_partition_expr_version(
519                region_id,
520                expected_version,
521                bulk_req.partition_expr_version,
522            ) {
523                bulk_req.sender.send(Err(e));
524                continue;
525            }
526
527            // Double-check the request schema
528            let need_fill_missing_columns = region_ctx.version().metadata.schema_version
529                != bulk_req.region_metadata.schema_version;
530
531            // Fill missing columns if needed
532            if need_fill_missing_columns
533                && let Err(e) = bulk_req
534                    .request
535                    .fill_missing_columns(&region_ctx.version().metadata)
536            {
537                bulk_req.sender.send(Err(e));
538                continue;
539            }
540
541            // Collect requests by region.
542            if !region_ctx.push_bulk(bulk_req.sender, bulk_req.request, None) {
543                return;
544            }
545        }
546    }
547
548    /// Returns true if the engine needs to reject some write requests.
549    pub(crate) fn should_reject_write(&self) -> bool {
550        // If memory usage reaches high threshold (we should also consider stalled requests) returns true.
551        self.write_buffer_manager.memory_usage() + self.stalled_requests.estimated_size
552            >= self.config.global_write_buffer_reject_size.as_bytes() as usize
553    }
554}
555
556/// Send rejected error to all `write_requests`.
557fn reject_write_requests(
558    write_requests: &mut Vec<SenderWriteRequest>,
559    bulk_requests: &mut Vec<SenderBulkRequest>,
560) {
561    WRITE_REJECT_TOTAL.inc_by(write_requests.len() as u64);
562
563    for req in write_requests.drain(..) {
564        req.sender.send(
565            RejectWriteSnafu {
566                region_id: req.request.region_id,
567            }
568            .fail(),
569        );
570    }
571    for req in bulk_requests.drain(..) {
572        let region_id = req.region_id;
573        req.sender.send(RejectWriteSnafu { region_id }.fail());
574    }
575}
576
577/// Rejects delete request under append mode.
578fn check_op_type(append_mode: bool, request: &WriteRequest) -> Result<()> {
579    if append_mode {
580        ensure!(
581            request.op_type == OpType::Put,
582            InvalidRequestSnafu {
583                region_id: request.region_id,
584                reason: "DELETE is not allowed under append mode",
585            }
586        );
587    }
588
589    Ok(())
590}
591
592fn check_partition_expr_version(
593    region_id: RegionId,
594    expected_version: u64,
595    request_version: Option<u64>,
596) -> Result<()> {
597    let request_version = match request_version {
598        None => return Ok(()),
599        Some(value) => value,
600    };
601    if request_version != expected_version {
602        return PartitionExprVersionMismatchSnafu {
603            region_id,
604            request_version,
605            expected_version,
606        }
607        .fail();
608    }
609    Ok(())
610}