mito2/worker/
handle_write.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Handling write requests.
16
17use std::collections::{hash_map, HashMap};
18use std::sync::Arc;
19
20use api::v1::OpType;
21use common_telemetry::{debug, error};
22use common_wal::options::WalOptions;
23use snafu::ensure;
24use store_api::codec::PrimaryKeyEncoding;
25use store_api::logstore::LogStore;
26use store_api::storage::RegionId;
27
28use crate::error::{InvalidRequestSnafu, RegionStateSnafu, RejectWriteSnafu, Result};
29use crate::metrics::{WRITE_REJECT_TOTAL, WRITE_ROWS_TOTAL, WRITE_STAGE_ELAPSED};
30use crate::region::{RegionLeaderState, RegionRoleState};
31use crate::region_write_ctx::RegionWriteCtx;
32use crate::request::{SenderWriteRequest, WriteRequest};
33use crate::worker::RegionWorkerLoop;
34
35impl<S: LogStore> RegionWorkerLoop<S> {
36    /// Takes and handles all write requests.
37    pub(crate) async fn handle_write_requests(
38        &mut self,
39        write_requests: &mut Vec<SenderWriteRequest>,
40        allow_stall: bool,
41    ) {
42        if write_requests.is_empty() {
43            return;
44        }
45
46        // Flush this worker if the engine needs to flush.
47        self.maybe_flush_worker();
48
49        if self.should_reject_write() {
50            // The memory pressure is still too high, reject write requests.
51            reject_write_requests(write_requests);
52            // Also reject all stalled requests.
53            self.reject_stalled_requests();
54            return;
55        }
56
57        if self.write_buffer_manager.should_stall() && allow_stall {
58            self.stalled_count.add(write_requests.len() as i64);
59            self.stalled_requests.append(write_requests);
60            self.listener.on_write_stall();
61            return;
62        }
63
64        // Prepare write context.
65        let mut region_ctxs = {
66            let _timer = WRITE_STAGE_ELAPSED
67                .with_label_values(&["prepare_ctx"])
68                .start_timer();
69            self.prepare_region_write_ctx(write_requests)
70        };
71
72        // Write to WAL.
73        {
74            let _timer = WRITE_STAGE_ELAPSED
75                .with_label_values(&["write_wal"])
76                .start_timer();
77            let mut wal_writer = self.wal.writer();
78            for region_ctx in region_ctxs.values_mut() {
79                if let WalOptions::Noop = &region_ctx.version().options.wal_options {
80                    // Skip wal write for noop region.
81                    continue;
82                }
83                if let Err(e) = region_ctx.add_wal_entry(&mut wal_writer).map_err(Arc::new) {
84                    region_ctx.set_error(e);
85                }
86            }
87            match wal_writer.write_to_wal().await.map_err(Arc::new) {
88                Ok(response) => {
89                    for (region_id, region_ctx) in region_ctxs.iter_mut() {
90                        if let WalOptions::Noop = &region_ctx.version().options.wal_options {
91                            continue;
92                        }
93
94                        // Safety: the log store implementation ensures that either the `write_to_wal` fails and no
95                        // response is returned or the last entry ids for each region do exist.
96                        let last_entry_id = response.last_entry_ids.get(region_id).unwrap();
97                        region_ctx.set_next_entry_id(last_entry_id + 1);
98                    }
99                }
100                Err(e) => {
101                    // Failed to write wal.
102                    for mut region_ctx in region_ctxs.into_values() {
103                        region_ctx.set_error(e.clone());
104                    }
105                    return;
106                }
107            }
108        }
109
110        let (mut put_rows, mut delete_rows) = (0, 0);
111        // Write to memtables.
112        {
113            let _timer = WRITE_STAGE_ELAPSED
114                .with_label_values(&["write_memtable"])
115                .start_timer();
116            if region_ctxs.len() == 1 {
117                // fast path for single region.
118                let mut region_ctx = region_ctxs.into_values().next().unwrap();
119                region_ctx.write_memtable().await;
120                put_rows += region_ctx.put_num;
121                delete_rows += region_ctx.delete_num;
122            } else {
123                let region_write_task = region_ctxs
124                    .into_values()
125                    .map(|mut region_ctx| {
126                        // use tokio runtime to schedule tasks.
127                        common_runtime::spawn_global(async move {
128                            region_ctx.write_memtable().await;
129                            (region_ctx.put_num, region_ctx.delete_num)
130                        })
131                    })
132                    .collect::<Vec<_>>();
133
134                for result in futures::future::join_all(region_write_task).await {
135                    match result {
136                        Ok((put, delete)) => {
137                            put_rows += put;
138                            delete_rows += delete;
139                        }
140                        Err(e) => {
141                            error!(e; "unexpected error when joining region write tasks");
142                        }
143                    }
144                }
145            }
146        }
147        WRITE_ROWS_TOTAL
148            .with_label_values(&["put"])
149            .inc_by(put_rows as u64);
150        WRITE_ROWS_TOTAL
151            .with_label_values(&["delete"])
152            .inc_by(delete_rows as u64);
153    }
154
155    /// Handles all stalled write requests.
156    pub(crate) async fn handle_stalled_requests(&mut self) {
157        // Handle stalled requests.
158        let stalled = std::mem::take(&mut self.stalled_requests);
159        self.stalled_count.sub(stalled.stalled_count() as i64);
160        // We already stalled these requests, don't stall them again.
161        for (_, (_, mut requests)) in stalled.requests {
162            self.handle_write_requests(&mut requests, false).await;
163        }
164    }
165
166    /// Rejects all stalled requests.
167    pub(crate) fn reject_stalled_requests(&mut self) {
168        let stalled = std::mem::take(&mut self.stalled_requests);
169        self.stalled_count.sub(stalled.stalled_count() as i64);
170        for (_, (_, mut requests)) in stalled.requests {
171            reject_write_requests(&mut requests);
172        }
173    }
174
175    /// Rejects a specific region's stalled requests.
176    pub(crate) fn reject_region_stalled_requests(&mut self, region_id: &RegionId) {
177        debug!("Rejects stalled requests for region {}", region_id);
178        let mut requests = self.stalled_requests.remove(region_id);
179        self.stalled_count.sub(requests.len() as i64);
180        reject_write_requests(&mut requests);
181    }
182
183    /// Handles a specific region's stalled requests.
184    pub(crate) async fn handle_region_stalled_requests(&mut self, region_id: &RegionId) {
185        debug!("Handles stalled requests for region {}", region_id);
186        let mut requests = self.stalled_requests.remove(region_id);
187        self.stalled_count.sub(requests.len() as i64);
188        self.handle_write_requests(&mut requests, true).await;
189    }
190}
191
192impl<S> RegionWorkerLoop<S> {
193    /// Validates and groups requests by region.
194    fn prepare_region_write_ctx(
195        &mut self,
196        write_requests: &mut Vec<SenderWriteRequest>,
197    ) -> HashMap<RegionId, RegionWriteCtx> {
198        // Initialize region write context map.
199        let mut region_ctxs = HashMap::new();
200        for mut sender_req in write_requests.drain(..) {
201            let region_id = sender_req.request.region_id;
202
203            // If region is waiting for alteration, add requests to pending writes.
204            if self.flush_scheduler.has_pending_ddls(region_id) {
205                // TODO(yingwen): consider adding some metrics for this.
206                // Safety: The region has pending ddls.
207                self.flush_scheduler
208                    .add_write_request_to_pending(sender_req);
209                continue;
210            }
211
212            // Checks whether the region exists and is it stalling.
213            if let hash_map::Entry::Vacant(e) = region_ctxs.entry(region_id) {
214                let Some(region) = self
215                    .regions
216                    .get_region_or(region_id, &mut sender_req.sender)
217                else {
218                    // No such region.
219                    continue;
220                };
221                match region.state() {
222                    RegionRoleState::Leader(RegionLeaderState::Writable) => {
223                        let region_ctx = RegionWriteCtx::new(
224                            region.region_id,
225                            &region.version_control,
226                            region.provider.clone(),
227                        );
228
229                        e.insert(region_ctx);
230                    }
231                    RegionRoleState::Leader(RegionLeaderState::Altering) => {
232                        debug!(
233                            "Region {} is altering, add request to pending writes",
234                            region.region_id
235                        );
236                        self.stalled_count.add(1);
237                        self.stalled_requests.push(sender_req);
238                        continue;
239                    }
240                    state => {
241                        // The region is not writable.
242                        sender_req.sender.send(
243                            RegionStateSnafu {
244                                region_id,
245                                state,
246                                expect: RegionRoleState::Leader(RegionLeaderState::Writable),
247                            }
248                            .fail(),
249                        );
250                        continue;
251                    }
252                }
253            }
254
255            // Safety: Now we ensure the region exists.
256            let region_ctx = region_ctxs.get_mut(&region_id).unwrap();
257
258            if let Err(e) = check_op_type(
259                region_ctx.version().options.append_mode,
260                &sender_req.request,
261            ) {
262                // Do not allow non-put op under append mode.
263                sender_req.sender.send(Err(e));
264
265                continue;
266            }
267
268            // Double check the request schema
269            let need_fill_missing_columns =
270                if let Some(ref region_metadata) = sender_req.request.region_metadata {
271                    region_ctx.version().metadata.schema_version != region_metadata.schema_version
272                } else {
273                    true
274                };
275            // Only fill missing columns if primary key is dense encoded.
276            if need_fill_missing_columns
277                && sender_req.request.primary_key_encoding() == PrimaryKeyEncoding::Dense
278            {
279                if let Err(e) = sender_req
280                    .request
281                    .maybe_fill_missing_columns(&region_ctx.version().metadata)
282                {
283                    sender_req.sender.send(Err(e));
284
285                    continue;
286                }
287            }
288
289            // Collect requests by region.
290            region_ctx.push_mutation(
291                sender_req.request.op_type as i32,
292                Some(sender_req.request.rows),
293                sender_req.request.hint,
294                sender_req.sender,
295            );
296        }
297
298        region_ctxs
299    }
300
301    /// Returns true if the engine needs to reject some write requests.
302    pub(crate) fn should_reject_write(&self) -> bool {
303        // If memory usage reaches high threshold (we should also consider stalled requests) returns true.
304        self.write_buffer_manager.memory_usage() + self.stalled_requests.estimated_size
305            >= self.config.global_write_buffer_reject_size.as_bytes() as usize
306    }
307}
308
309/// Send rejected error to all `write_requests`.
310fn reject_write_requests(write_requests: &mut Vec<SenderWriteRequest>) {
311    WRITE_REJECT_TOTAL.inc_by(write_requests.len() as u64);
312
313    for req in write_requests.drain(..) {
314        req.sender.send(
315            RejectWriteSnafu {
316                region_id: req.request.region_id,
317            }
318            .fail(),
319        );
320    }
321}
322
323/// Rejects delete request under append mode.
324fn check_op_type(append_mode: bool, request: &WriteRequest) -> Result<()> {
325    if append_mode {
326        ensure!(
327            request.op_type == OpType::Put,
328            InvalidRequestSnafu {
329                region_id: request.region_id,
330                reason: "DELETE is not allowed under append mode",
331            }
332        );
333    }
334
335    Ok(())
336}