mito2/worker/
handle_write.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Handling write requests.
16
17use std::collections::{hash_map, HashMap};
18use std::sync::Arc;
19
20use api::v1::OpType;
21use common_telemetry::{debug, error};
22use common_wal::options::WalOptions;
23use snafu::ensure;
24use store_api::codec::PrimaryKeyEncoding;
25use store_api::logstore::LogStore;
26use store_api::storage::RegionId;
27
28use crate::error::{InvalidRequestSnafu, RegionStateSnafu, RejectWriteSnafu, Result};
29use crate::metrics;
30use crate::metrics::{
31    WRITE_REJECT_TOTAL, WRITE_ROWS_TOTAL, WRITE_STAGE_ELAPSED, WRITE_STALL_TOTAL,
32};
33use crate::region::{RegionLeaderState, RegionRoleState};
34use crate::region_write_ctx::RegionWriteCtx;
35use crate::request::{SenderBulkRequest, SenderWriteRequest, WriteRequest};
36use crate::worker::RegionWorkerLoop;
37
38impl<S: LogStore> RegionWorkerLoop<S> {
39    /// Takes and handles all write requests.
40    pub(crate) async fn handle_write_requests(
41        &mut self,
42        write_requests: &mut Vec<SenderWriteRequest>,
43        bulk_requests: &mut Vec<SenderBulkRequest>,
44        allow_stall: bool,
45    ) {
46        if write_requests.is_empty() && bulk_requests.is_empty() {
47            return;
48        }
49
50        // Flush this worker if the engine needs to flush.
51        self.maybe_flush_worker();
52
53        if self.should_reject_write() {
54            // The memory pressure is still too high, reject write requests.
55            reject_write_requests(write_requests, bulk_requests);
56            // Also reject all stalled requests.
57            self.reject_stalled_requests();
58            return;
59        }
60
61        if self.write_buffer_manager.should_stall() && allow_stall {
62            let stalled_count = (write_requests.len() + bulk_requests.len()) as i64;
63            self.stalling_count.add(stalled_count);
64            WRITE_STALL_TOTAL.inc_by(stalled_count as u64);
65            self.stalled_requests.append(write_requests, bulk_requests);
66            self.listener.on_write_stall();
67            return;
68        }
69
70        // Prepare write context.
71        let mut region_ctxs = {
72            let _timer = WRITE_STAGE_ELAPSED
73                .with_label_values(&["prepare_ctx"])
74                .start_timer();
75            self.prepare_region_write_ctx(write_requests, bulk_requests)
76        };
77
78        // Write to WAL.
79        {
80            let _timer = WRITE_STAGE_ELAPSED
81                .with_label_values(&["write_wal"])
82                .start_timer();
83            let mut wal_writer = self.wal.writer();
84            for region_ctx in region_ctxs.values_mut() {
85                if let WalOptions::Noop = &region_ctx.version().options.wal_options {
86                    // Skip wal write for noop region.
87                    continue;
88                }
89                if let Err(e) = region_ctx.add_wal_entry(&mut wal_writer).map_err(Arc::new) {
90                    region_ctx.set_error(e);
91                }
92            }
93            match wal_writer.write_to_wal().await.map_err(Arc::new) {
94                Ok(response) => {
95                    for (region_id, region_ctx) in region_ctxs.iter_mut() {
96                        if let WalOptions::Noop = &region_ctx.version().options.wal_options {
97                            continue;
98                        }
99
100                        // Safety: the log store implementation ensures that either the `write_to_wal` fails and no
101                        // response is returned or the last entry ids for each region do exist.
102                        let last_entry_id = response.last_entry_ids.get(region_id).unwrap();
103                        region_ctx.set_next_entry_id(last_entry_id + 1);
104                    }
105                }
106                Err(e) => {
107                    // Failed to write wal.
108                    for mut region_ctx in region_ctxs.into_values() {
109                        region_ctx.set_error(e.clone());
110                    }
111                    return;
112                }
113            }
114        }
115
116        let (mut put_rows, mut delete_rows) = (0, 0);
117        // Write to memtables.
118        {
119            let _timer = WRITE_STAGE_ELAPSED
120                .with_label_values(&["write_memtable"])
121                .start_timer();
122            if region_ctxs.len() == 1 {
123                // fast path for single region.
124                let mut region_ctx = region_ctxs.into_values().next().unwrap();
125                region_ctx.write_memtable().await;
126                region_ctx.write_bulk().await;
127                put_rows += region_ctx.put_num;
128                delete_rows += region_ctx.delete_num;
129            } else {
130                let region_write_task = region_ctxs
131                    .into_values()
132                    .map(|mut region_ctx| {
133                        // use tokio runtime to schedule tasks.
134                        common_runtime::spawn_global(async move {
135                            region_ctx.write_memtable().await;
136                            region_ctx.write_bulk().await;
137                            (region_ctx.put_num, region_ctx.delete_num)
138                        })
139                    })
140                    .collect::<Vec<_>>();
141
142                for result in futures::future::join_all(region_write_task).await {
143                    match result {
144                        Ok((put, delete)) => {
145                            put_rows += put;
146                            delete_rows += delete;
147                        }
148                        Err(e) => {
149                            error!(e; "unexpected error when joining region write tasks");
150                        }
151                    }
152                }
153            }
154        }
155        WRITE_ROWS_TOTAL
156            .with_label_values(&["put"])
157            .inc_by(put_rows as u64);
158        WRITE_ROWS_TOTAL
159            .with_label_values(&["delete"])
160            .inc_by(delete_rows as u64);
161    }
162
163    /// Handles all stalled write requests.
164    pub(crate) async fn handle_stalled_requests(&mut self) {
165        // Handle stalled requests.
166        let stalled = std::mem::take(&mut self.stalled_requests);
167        self.stalling_count.sub(stalled.stalled_count() as i64);
168        // We already stalled these requests, don't stall them again.
169        for (_, (_, mut requests, mut bulk)) in stalled.requests {
170            self.handle_write_requests(&mut requests, &mut bulk, false)
171                .await;
172        }
173    }
174
175    /// Rejects all stalled requests.
176    pub(crate) fn reject_stalled_requests(&mut self) {
177        let stalled = std::mem::take(&mut self.stalled_requests);
178        self.stalling_count.sub(stalled.stalled_count() as i64);
179        for (_, (_, mut requests, mut bulk)) in stalled.requests {
180            reject_write_requests(&mut requests, &mut bulk);
181        }
182    }
183
184    /// Rejects a specific region's stalled requests.
185    pub(crate) fn reject_region_stalled_requests(&mut self, region_id: &RegionId) {
186        debug!("Rejects stalled requests for region {}", region_id);
187        let (mut requests, mut bulk) = self.stalled_requests.remove(region_id);
188        self.stalling_count
189            .sub((requests.len() + bulk.len()) as i64);
190        reject_write_requests(&mut requests, &mut bulk);
191    }
192
193    /// Handles a specific region's stalled requests.
194    pub(crate) async fn handle_region_stalled_requests(&mut self, region_id: &RegionId) {
195        debug!("Handles stalled requests for region {}", region_id);
196        let (mut requests, mut bulk) = self.stalled_requests.remove(region_id);
197        self.stalling_count
198            .sub((requests.len() + bulk.len()) as i64);
199        self.handle_write_requests(&mut requests, &mut bulk, true)
200            .await;
201    }
202}
203
204impl<S> RegionWorkerLoop<S> {
205    /// Validates and groups requests by region.
206    fn prepare_region_write_ctx(
207        &mut self,
208        write_requests: &mut Vec<SenderWriteRequest>,
209        bulk_requests: &mut Vec<SenderBulkRequest>,
210    ) -> HashMap<RegionId, RegionWriteCtx> {
211        // Initialize region write context map.
212        let mut region_ctxs = HashMap::new();
213        self.process_write_requests(&mut region_ctxs, write_requests);
214        self.process_bulk_requests(&mut region_ctxs, bulk_requests);
215        region_ctxs
216    }
217
218    fn process_write_requests(
219        &mut self,
220        region_ctxs: &mut HashMap<RegionId, RegionWriteCtx>,
221        write_requests: &mut Vec<SenderWriteRequest>,
222    ) {
223        for mut sender_req in write_requests.drain(..) {
224            let region_id = sender_req.request.region_id;
225
226            // If region is waiting for alteration, add requests to pending writes.
227            if self.flush_scheduler.has_pending_ddls(region_id) {
228                // TODO(yingwen): consider adding some metrics for this.
229                // Safety: The region has pending ddls.
230                self.flush_scheduler
231                    .add_write_request_to_pending(sender_req);
232                continue;
233            }
234
235            // Checks whether the region exists and is it stalling.
236            if let hash_map::Entry::Vacant(e) = region_ctxs.entry(region_id) {
237                let Some(region) = self
238                    .regions
239                    .get_region_or(region_id, &mut sender_req.sender)
240                else {
241                    // No such region.
242                    continue;
243                };
244                match region.state() {
245                    RegionRoleState::Leader(RegionLeaderState::Writable)
246                    | RegionRoleState::Leader(RegionLeaderState::Staging) => {
247                        let region_ctx = RegionWriteCtx::new(
248                            region.region_id,
249                            &region.version_control,
250                            region.provider.clone(),
251                            Some(region.written_bytes.clone()),
252                        );
253
254                        e.insert(region_ctx);
255                    }
256                    RegionRoleState::Leader(RegionLeaderState::Altering) => {
257                        debug!(
258                            "Region {} is altering, add request to pending writes",
259                            region.region_id
260                        );
261                        self.stalling_count.add(1);
262                        WRITE_STALL_TOTAL.inc();
263                        self.stalled_requests.push(sender_req);
264                        continue;
265                    }
266                    state => {
267                        // The region is not writable.
268                        sender_req.sender.send(
269                            RegionStateSnafu {
270                                region_id,
271                                state,
272                                expect: RegionRoleState::Leader(RegionLeaderState::Writable),
273                            }
274                            .fail(),
275                        );
276                        continue;
277                    }
278                }
279            }
280
281            // Safety: Now we ensure the region exists.
282            let region_ctx = region_ctxs.get_mut(&region_id).unwrap();
283
284            if let Err(e) = check_op_type(
285                region_ctx.version().options.append_mode,
286                &sender_req.request,
287            ) {
288                // Do not allow non-put op under append mode.
289                sender_req.sender.send(Err(e));
290
291                continue;
292            }
293
294            // Double check the request schema
295            let need_fill_missing_columns =
296                if let Some(ref region_metadata) = sender_req.request.region_metadata {
297                    region_ctx.version().metadata.schema_version != region_metadata.schema_version
298                } else {
299                    true
300                };
301            // Only fill missing columns if primary key is dense encoded.
302            if need_fill_missing_columns
303                && sender_req.request.primary_key_encoding() == PrimaryKeyEncoding::Dense
304            {
305                if let Err(e) = sender_req
306                    .request
307                    .maybe_fill_missing_columns(&region_ctx.version().metadata)
308                {
309                    sender_req.sender.send(Err(e));
310
311                    continue;
312                }
313            }
314
315            // Collect requests by region.
316            region_ctx.push_mutation(
317                sender_req.request.op_type as i32,
318                Some(sender_req.request.rows),
319                sender_req.request.hint,
320                sender_req.sender,
321            );
322        }
323    }
324
325    /// Processes bulk insert requests.
326    fn process_bulk_requests(
327        &mut self,
328        region_ctxs: &mut HashMap<RegionId, RegionWriteCtx>,
329        requests: &mut Vec<SenderBulkRequest>,
330    ) {
331        let _timer = metrics::REGION_WORKER_HANDLE_WRITE_ELAPSED
332            .with_label_values(&["prepare_bulk_request"])
333            .start_timer();
334        for mut bulk_req in requests.drain(..) {
335            let region_id = bulk_req.region_id;
336            // If region is waiting for alteration, add requests to pending writes.
337            if self.flush_scheduler.has_pending_ddls(region_id) {
338                // Safety: The region has pending ddls.
339                self.flush_scheduler.add_bulk_request_to_pending(bulk_req);
340                continue;
341            }
342
343            // Checks whether the region exists and is it stalling.
344            if let hash_map::Entry::Vacant(e) = region_ctxs.entry(region_id) {
345                let Some(region) = self.regions.get_region_or(region_id, &mut bulk_req.sender)
346                else {
347                    continue;
348                };
349                match region.state() {
350                    RegionRoleState::Leader(RegionLeaderState::Writable) => {
351                        let region_ctx = RegionWriteCtx::new(
352                            region.region_id,
353                            &region.version_control,
354                            region.provider.clone(),
355                            Some(region.written_bytes.clone()),
356                        );
357
358                        e.insert(region_ctx);
359                    }
360                    RegionRoleState::Leader(RegionLeaderState::Altering) => {
361                        debug!(
362                            "Region {} is altering, add request to pending writes",
363                            region.region_id
364                        );
365                        self.stalling_count.add(1);
366                        WRITE_STALL_TOTAL.inc();
367                        self.stalled_requests.push_bulk(bulk_req);
368                        continue;
369                    }
370                    state => {
371                        // The region is not writable.
372                        bulk_req.sender.send(
373                            RegionStateSnafu {
374                                region_id,
375                                state,
376                                expect: RegionRoleState::Leader(RegionLeaderState::Writable),
377                            }
378                            .fail(),
379                        );
380                        continue;
381                    }
382                }
383            }
384
385            // Safety: Now we ensure the region exists.
386            let region_ctx = region_ctxs.get_mut(&region_id).unwrap();
387
388            // Double-check the request schema
389            let need_fill_missing_columns = region_ctx.version().metadata.schema_version
390                != bulk_req.region_metadata.schema_version;
391
392            // Only fill missing columns if primary key is dense encoded.
393            if need_fill_missing_columns {
394                // todo(hl): support filling default columns
395                bulk_req.sender.send(
396                    InvalidRequestSnafu {
397                        region_id,
398                        reason: "Schema mismatch",
399                    }
400                    .fail(),
401                );
402                return;
403            }
404
405            // Collect requests by region.
406            if !region_ctx.push_bulk(bulk_req.sender, bulk_req.request) {
407                return;
408            }
409        }
410    }
411
412    /// Returns true if the engine needs to reject some write requests.
413    pub(crate) fn should_reject_write(&self) -> bool {
414        // If memory usage reaches high threshold (we should also consider stalled requests) returns true.
415        self.write_buffer_manager.memory_usage() + self.stalled_requests.estimated_size
416            >= self.config.global_write_buffer_reject_size.as_bytes() as usize
417    }
418}
419
420/// Send rejected error to all `write_requests`.
421fn reject_write_requests(
422    write_requests: &mut Vec<SenderWriteRequest>,
423    bulk_requests: &mut Vec<SenderBulkRequest>,
424) {
425    WRITE_REJECT_TOTAL.inc_by(write_requests.len() as u64);
426
427    for req in write_requests.drain(..) {
428        req.sender.send(
429            RejectWriteSnafu {
430                region_id: req.request.region_id,
431            }
432            .fail(),
433        );
434    }
435    for req in bulk_requests.drain(..) {
436        let region_id = req.region_id;
437        req.sender.send(RejectWriteSnafu { region_id }.fail());
438    }
439}
440
441/// Rejects delete request under append mode.
442fn check_op_type(append_mode: bool, request: &WriteRequest) -> Result<()> {
443    if append_mode {
444        ensure!(
445            request.op_type == OpType::Put,
446            InvalidRequestSnafu {
447                region_id: request.region_id,
448                reason: "DELETE is not allowed under append mode",
449            }
450        );
451    }
452
453    Ok(())
454}