mito2/worker/
handle_write.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Handling write requests.
16
17use std::collections::{hash_map, HashMap};
18use std::sync::Arc;
19
20use api::v1::OpType;
21use common_telemetry::{debug, error};
22use common_wal::options::WalOptions;
23use snafu::ensure;
24use store_api::codec::PrimaryKeyEncoding;
25use store_api::logstore::LogStore;
26use store_api::storage::RegionId;
27
28use crate::error::{InvalidRequestSnafu, RegionStateSnafu, RejectWriteSnafu, Result};
29use crate::metrics;
30use crate::metrics::{WRITE_REJECT_TOTAL, WRITE_ROWS_TOTAL, WRITE_STAGE_ELAPSED};
31use crate::region::{RegionLeaderState, RegionRoleState};
32use crate::region_write_ctx::RegionWriteCtx;
33use crate::request::{SenderBulkRequest, SenderWriteRequest, WriteRequest};
34use crate::worker::RegionWorkerLoop;
35
36impl<S: LogStore> RegionWorkerLoop<S> {
37    /// Takes and handles all write requests.
38    pub(crate) async fn handle_write_requests(
39        &mut self,
40        write_requests: &mut Vec<SenderWriteRequest>,
41        bulk_requests: &mut Vec<SenderBulkRequest>,
42        allow_stall: bool,
43    ) {
44        if write_requests.is_empty() && bulk_requests.is_empty() {
45            return;
46        }
47
48        // Flush this worker if the engine needs to flush.
49        self.maybe_flush_worker();
50
51        if self.should_reject_write() {
52            // The memory pressure is still too high, reject write requests.
53            reject_write_requests(write_requests, bulk_requests);
54            // Also reject all stalled requests.
55            self.reject_stalled_requests();
56            return;
57        }
58
59        if self.write_buffer_manager.should_stall() && allow_stall {
60            self.stalled_count
61                .add((write_requests.len() + bulk_requests.len()) as i64);
62            self.stalled_requests.append(write_requests, bulk_requests);
63            self.listener.on_write_stall();
64            return;
65        }
66
67        // Prepare write context.
68        let mut region_ctxs = {
69            let _timer = WRITE_STAGE_ELAPSED
70                .with_label_values(&["prepare_ctx"])
71                .start_timer();
72            self.prepare_region_write_ctx(write_requests, bulk_requests)
73        };
74
75        // Write to WAL.
76        {
77            let _timer = WRITE_STAGE_ELAPSED
78                .with_label_values(&["write_wal"])
79                .start_timer();
80            let mut wal_writer = self.wal.writer();
81            for region_ctx in region_ctxs.values_mut() {
82                if let WalOptions::Noop = &region_ctx.version().options.wal_options {
83                    // Skip wal write for noop region.
84                    continue;
85                }
86                if let Err(e) = region_ctx.add_wal_entry(&mut wal_writer).map_err(Arc::new) {
87                    region_ctx.set_error(e);
88                }
89            }
90            match wal_writer.write_to_wal().await.map_err(Arc::new) {
91                Ok(response) => {
92                    for (region_id, region_ctx) in region_ctxs.iter_mut() {
93                        if let WalOptions::Noop = &region_ctx.version().options.wal_options {
94                            continue;
95                        }
96
97                        // Safety: the log store implementation ensures that either the `write_to_wal` fails and no
98                        // response is returned or the last entry ids for each region do exist.
99                        let last_entry_id = response.last_entry_ids.get(region_id).unwrap();
100                        region_ctx.set_next_entry_id(last_entry_id + 1);
101                    }
102                }
103                Err(e) => {
104                    // Failed to write wal.
105                    for mut region_ctx in region_ctxs.into_values() {
106                        region_ctx.set_error(e.clone());
107                    }
108                    return;
109                }
110            }
111        }
112
113        let (mut put_rows, mut delete_rows) = (0, 0);
114        // Write to memtables.
115        {
116            let _timer = WRITE_STAGE_ELAPSED
117                .with_label_values(&["write_memtable"])
118                .start_timer();
119            if region_ctxs.len() == 1 {
120                // fast path for single region.
121                let mut region_ctx = region_ctxs.into_values().next().unwrap();
122                region_ctx.write_memtable().await;
123                region_ctx.write_bulk().await;
124                put_rows += region_ctx.put_num;
125                delete_rows += region_ctx.delete_num;
126            } else {
127                let region_write_task = region_ctxs
128                    .into_values()
129                    .map(|mut region_ctx| {
130                        // use tokio runtime to schedule tasks.
131                        common_runtime::spawn_global(async move {
132                            region_ctx.write_memtable().await;
133                            region_ctx.write_bulk().await;
134                            (region_ctx.put_num, region_ctx.delete_num)
135                        })
136                    })
137                    .collect::<Vec<_>>();
138
139                for result in futures::future::join_all(region_write_task).await {
140                    match result {
141                        Ok((put, delete)) => {
142                            put_rows += put;
143                            delete_rows += delete;
144                        }
145                        Err(e) => {
146                            error!(e; "unexpected error when joining region write tasks");
147                        }
148                    }
149                }
150            }
151        }
152        WRITE_ROWS_TOTAL
153            .with_label_values(&["put"])
154            .inc_by(put_rows as u64);
155        WRITE_ROWS_TOTAL
156            .with_label_values(&["delete"])
157            .inc_by(delete_rows as u64);
158    }
159
160    /// Handles all stalled write requests.
161    pub(crate) async fn handle_stalled_requests(&mut self) {
162        // Handle stalled requests.
163        let stalled = std::mem::take(&mut self.stalled_requests);
164        self.stalled_count.sub(stalled.stalled_count() as i64);
165        // We already stalled these requests, don't stall them again.
166        for (_, (_, mut requests, mut bulk)) in stalled.requests {
167            self.handle_write_requests(&mut requests, &mut bulk, false)
168                .await;
169        }
170    }
171
172    /// Rejects all stalled requests.
173    pub(crate) fn reject_stalled_requests(&mut self) {
174        let stalled = std::mem::take(&mut self.stalled_requests);
175        self.stalled_count.sub(stalled.stalled_count() as i64);
176        for (_, (_, mut requests, mut bulk)) in stalled.requests {
177            reject_write_requests(&mut requests, &mut bulk);
178        }
179    }
180
181    /// Rejects a specific region's stalled requests.
182    pub(crate) fn reject_region_stalled_requests(&mut self, region_id: &RegionId) {
183        debug!("Rejects stalled requests for region {}", region_id);
184        let (mut requests, mut bulk) = self.stalled_requests.remove(region_id);
185        self.stalled_count.sub((requests.len() + bulk.len()) as i64);
186        reject_write_requests(&mut requests, &mut bulk);
187    }
188
189    /// Handles a specific region's stalled requests.
190    pub(crate) async fn handle_region_stalled_requests(&mut self, region_id: &RegionId) {
191        debug!("Handles stalled requests for region {}", region_id);
192        let (mut requests, mut bulk) = self.stalled_requests.remove(region_id);
193        self.stalled_count.sub((requests.len() + bulk.len()) as i64);
194        self.handle_write_requests(&mut requests, &mut bulk, true)
195            .await;
196    }
197}
198
199impl<S> RegionWorkerLoop<S> {
200    /// Validates and groups requests by region.
201    fn prepare_region_write_ctx(
202        &mut self,
203        write_requests: &mut Vec<SenderWriteRequest>,
204        bulk_requests: &mut Vec<SenderBulkRequest>,
205    ) -> HashMap<RegionId, RegionWriteCtx> {
206        // Initialize region write context map.
207        let mut region_ctxs = HashMap::new();
208        self.process_write_requests(&mut region_ctxs, write_requests);
209        self.process_bulk_requests(&mut region_ctxs, bulk_requests);
210        region_ctxs
211    }
212
213    fn process_write_requests(
214        &mut self,
215        region_ctxs: &mut HashMap<RegionId, RegionWriteCtx>,
216        write_requests: &mut Vec<SenderWriteRequest>,
217    ) {
218        for mut sender_req in write_requests.drain(..) {
219            let region_id = sender_req.request.region_id;
220
221            // If region is waiting for alteration, add requests to pending writes.
222            if self.flush_scheduler.has_pending_ddls(region_id) {
223                // TODO(yingwen): consider adding some metrics for this.
224                // Safety: The region has pending ddls.
225                self.flush_scheduler
226                    .add_write_request_to_pending(sender_req);
227                continue;
228            }
229
230            // Checks whether the region exists and is it stalling.
231            if let hash_map::Entry::Vacant(e) = region_ctxs.entry(region_id) {
232                let Some(region) = self
233                    .regions
234                    .get_region_or(region_id, &mut sender_req.sender)
235                else {
236                    // No such region.
237                    continue;
238                };
239                match region.state() {
240                    RegionRoleState::Leader(RegionLeaderState::Writable) => {
241                        let region_ctx = RegionWriteCtx::new(
242                            region.region_id,
243                            &region.version_control,
244                            region.provider.clone(),
245                        );
246
247                        e.insert(region_ctx);
248                    }
249                    RegionRoleState::Leader(RegionLeaderState::Altering) => {
250                        debug!(
251                            "Region {} is altering, add request to pending writes",
252                            region.region_id
253                        );
254                        self.stalled_count.add(1);
255                        self.stalled_requests.push(sender_req);
256                        continue;
257                    }
258                    state => {
259                        // The region is not writable.
260                        sender_req.sender.send(
261                            RegionStateSnafu {
262                                region_id,
263                                state,
264                                expect: RegionRoleState::Leader(RegionLeaderState::Writable),
265                            }
266                            .fail(),
267                        );
268                        continue;
269                    }
270                }
271            }
272
273            // Safety: Now we ensure the region exists.
274            let region_ctx = region_ctxs.get_mut(&region_id).unwrap();
275
276            if let Err(e) = check_op_type(
277                region_ctx.version().options.append_mode,
278                &sender_req.request,
279            ) {
280                // Do not allow non-put op under append mode.
281                sender_req.sender.send(Err(e));
282
283                continue;
284            }
285
286            // Double check the request schema
287            let need_fill_missing_columns =
288                if let Some(ref region_metadata) = sender_req.request.region_metadata {
289                    region_ctx.version().metadata.schema_version != region_metadata.schema_version
290                } else {
291                    true
292                };
293            // Only fill missing columns if primary key is dense encoded.
294            if need_fill_missing_columns
295                && sender_req.request.primary_key_encoding() == PrimaryKeyEncoding::Dense
296            {
297                if let Err(e) = sender_req
298                    .request
299                    .maybe_fill_missing_columns(&region_ctx.version().metadata)
300                {
301                    sender_req.sender.send(Err(e));
302
303                    continue;
304                }
305            }
306
307            // Collect requests by region.
308            region_ctx.push_mutation(
309                sender_req.request.op_type as i32,
310                Some(sender_req.request.rows),
311                sender_req.request.hint,
312                sender_req.sender,
313            );
314        }
315    }
316
317    /// Processes bulk insert requests.
318    fn process_bulk_requests(
319        &mut self,
320        region_ctxs: &mut HashMap<RegionId, RegionWriteCtx>,
321        requests: &mut Vec<SenderBulkRequest>,
322    ) {
323        let _timer = metrics::REGION_WORKER_HANDLE_WRITE_ELAPSED
324            .with_label_values(&["prepare_bulk_request"])
325            .start_timer();
326        for mut bulk_req in requests.drain(..) {
327            let region_id = bulk_req.region_id;
328            // If region is waiting for alteration, add requests to pending writes.
329            if self.flush_scheduler.has_pending_ddls(region_id) {
330                // Safety: The region has pending ddls.
331                self.flush_scheduler.add_bulk_request_to_pending(bulk_req);
332                continue;
333            }
334
335            // Checks whether the region exists and is it stalling.
336            if let hash_map::Entry::Vacant(e) = region_ctxs.entry(region_id) {
337                let Some(region) = self.regions.get_region_or(region_id, &mut bulk_req.sender)
338                else {
339                    continue;
340                };
341                match region.state() {
342                    RegionRoleState::Leader(RegionLeaderState::Writable) => {
343                        let region_ctx = RegionWriteCtx::new(
344                            region.region_id,
345                            &region.version_control,
346                            region.provider.clone(),
347                        );
348
349                        e.insert(region_ctx);
350                    }
351                    RegionRoleState::Leader(RegionLeaderState::Altering) => {
352                        debug!(
353                            "Region {} is altering, add request to pending writes",
354                            region.region_id
355                        );
356                        self.stalled_count.add(1);
357                        self.stalled_requests.push_bulk(bulk_req);
358                        continue;
359                    }
360                    state => {
361                        // The region is not writable.
362                        bulk_req.sender.send(
363                            RegionStateSnafu {
364                                region_id,
365                                state,
366                                expect: RegionRoleState::Leader(RegionLeaderState::Writable),
367                            }
368                            .fail(),
369                        );
370                        continue;
371                    }
372                }
373            }
374
375            // Safety: Now we ensure the region exists.
376            let region_ctx = region_ctxs.get_mut(&region_id).unwrap();
377
378            // Double-check the request schema
379            let need_fill_missing_columns = region_ctx.version().metadata.schema_version
380                != bulk_req.region_metadata.schema_version;
381
382            // Only fill missing columns if primary key is dense encoded.
383            if need_fill_missing_columns {
384                // todo(hl): support filling default columns
385                bulk_req.sender.send(
386                    InvalidRequestSnafu {
387                        region_id,
388                        reason: "Schema mismatch",
389                    }
390                    .fail(),
391                );
392                return;
393            }
394
395            // Collect requests by region.
396            region_ctx.push_bulk(bulk_req.sender, bulk_req.request);
397        }
398    }
399
400    /// Returns true if the engine needs to reject some write requests.
401    pub(crate) fn should_reject_write(&self) -> bool {
402        // If memory usage reaches high threshold (we should also consider stalled requests) returns true.
403        self.write_buffer_manager.memory_usage() + self.stalled_requests.estimated_size
404            >= self.config.global_write_buffer_reject_size.as_bytes() as usize
405    }
406}
407
408/// Send rejected error to all `write_requests`.
409fn reject_write_requests(
410    write_requests: &mut Vec<SenderWriteRequest>,
411    bulk_requests: &mut Vec<SenderBulkRequest>,
412) {
413    WRITE_REJECT_TOTAL.inc_by(write_requests.len() as u64);
414
415    for req in write_requests.drain(..) {
416        req.sender.send(
417            RejectWriteSnafu {
418                region_id: req.request.region_id,
419            }
420            .fail(),
421        );
422    }
423    for req in bulk_requests.drain(..) {
424        let region_id = req.region_id;
425        req.sender.send(RejectWriteSnafu { region_id }.fail());
426    }
427}
428
429/// Rejects delete request under append mode.
430fn check_op_type(append_mode: bool, request: &WriteRequest) -> Result<()> {
431    if append_mode {
432        ensure!(
433            request.op_type == OpType::Put,
434            InvalidRequestSnafu {
435                region_id: request.region_id,
436                reason: "DELETE is not allowed under append mode",
437            }
438        );
439    }
440
441    Ok(())
442}