1use std::collections::{HashMap, VecDeque};
20use std::num::NonZeroU64;
21use std::sync::Arc;
22
23use common_telemetry::{info, warn};
24use store_api::logstore::LogStore;
25use store_api::storage::RegionId;
26
27use crate::cache::CacheManagerRef;
28use crate::cache::file_cache::{FileType, IndexKey};
29use crate::config::IndexBuildMode;
30use crate::error::{RegionBusySnafu, RegionNotFoundSnafu, Result};
31use crate::manifest::action::{
32    RegionChange, RegionEdit, RegionMetaAction, RegionMetaActionList, RegionTruncate,
33};
34use crate::metrics::WRITE_CACHE_INFLIGHT_DOWNLOAD;
35use crate::region::version::VersionBuilder;
36use crate::region::{MitoRegionRef, RegionLeaderState, RegionRoleState};
37use crate::request::{
38    BackgroundNotify, BuildIndexRequest, OptionOutputTx, RegionChangeResult, RegionEditRequest,
39    RegionEditResult, RegionSyncRequest, TruncateResult, WorkerRequest, WorkerRequestWithTime,
40};
41use crate::sst::index::IndexBuildType;
42use crate::sst::location;
43use crate::worker::{RegionWorkerLoop, WorkerListener};
44
45pub(crate) type RegionEditQueues = HashMap<RegionId, RegionEditQueue>;
46
47pub(crate) struct RegionEditQueue {
52    region_id: RegionId,
53    requests: VecDeque<RegionEditRequest>,
54}
55
56impl RegionEditQueue {
57    const QUEUE_MAX_LEN: usize = 128;
58
59    fn new(region_id: RegionId) -> Self {
60        Self {
61            region_id,
62            requests: VecDeque::new(),
63        }
64    }
65
66    fn enqueue(&mut self, request: RegionEditRequest) {
67        if self.requests.len() > Self::QUEUE_MAX_LEN {
68            let _ = request.tx.send(
69                RegionBusySnafu {
70                    region_id: self.region_id,
71                }
72                .fail(),
73            );
74            return;
75        };
76        self.requests.push_back(request);
77    }
78
79    fn dequeue(&mut self) -> Option<RegionEditRequest> {
80        self.requests.pop_front()
81    }
82}
83
84impl<S: LogStore> RegionWorkerLoop<S> {
85    pub(crate) async fn handle_manifest_region_change_result(
87        &mut self,
88        change_result: RegionChangeResult,
89    ) {
90        let region = match self.regions.get_region(change_result.region_id) {
91            Some(region) => region,
92            None => {
93                self.reject_region_stalled_requests(&change_result.region_id);
94                change_result.sender.send(
95                    RegionNotFoundSnafu {
96                        region_id: change_result.region_id,
97                    }
98                    .fail(),
99                );
100                return;
101            }
102        };
103
104        if change_result.result.is_ok() {
105            region
107                .version_control
108                .alter_schema(change_result.new_meta, ®ion.memtable_builder);
109
110            let version = region.version();
111            info!(
112                "Region {} is altered, metadata is {:?}, options: {:?}",
113                region.region_id, version.metadata, version.options,
114            );
115        }
116
117        region.switch_state_to_writable(RegionLeaderState::Altering);
119        change_result.sender.send(change_result.result.map(|_| 0));
121
122        if self.config.index.build_mode == IndexBuildMode::Async && change_result.need_index {
124            self.handle_rebuild_index(
125                BuildIndexRequest {
126                    region_id: region.region_id,
127                    build_type: IndexBuildType::SchemaChange,
128                    file_metas: Vec::new(),
129                },
130                OptionOutputTx::new(None),
131            )
132            .await;
133        }
134        self.handle_region_stalled_requests(&change_result.region_id)
136            .await;
137    }
138
139    pub(crate) async fn handle_region_sync(&mut self, request: RegionSyncRequest) {
144        let region_id = request.region_id;
145        let sender = request.sender;
146        let region = match self.regions.follower_region(region_id) {
147            Ok(region) => region,
148            Err(e) => {
149                let _ = sender.send(Err(e));
150                return;
151            }
152        };
153
154        let original_manifest_version = region.manifest_ctx.manifest_version().await;
155        let manifest = match region
156            .manifest_ctx
157            .install_manifest_to(request.manifest_version)
158            .await
159        {
160            Ok(manifest) => manifest,
161            Err(e) => {
162                let _ = sender.send(Err(e));
163                return;
164            }
165        };
166        let version = region.version();
167        if !version.memtables.is_empty() {
168            let current = region.version_control.current();
169            warn!(
170                "Region {} memtables is not empty, which should not happen, manifest version: {}, last entry id: {}",
171                region.region_id, manifest.manifest_version, current.last_entry_id
172            );
173        }
174        let region_options = version.options.clone();
175        let new_mutable = Arc::new(
176            region
177                .version()
178                .memtables
179                .mutable
180                .new_with_part_duration(version.compaction_time_window),
181        );
182        let metadata = manifest.metadata.clone();
183        let version = VersionBuilder::new(metadata, new_mutable)
184            .add_files(region.file_purger.clone(), manifest.files.values().cloned())
185            .flushed_entry_id(manifest.flushed_entry_id)
186            .flushed_sequence(manifest.flushed_sequence)
187            .truncated_entry_id(manifest.truncated_entry_id)
188            .compaction_time_window(manifest.compaction_time_window)
189            .options(region_options)
190            .build();
191        region.version_control.overwrite_current(Arc::new(version));
192
193        let updated = manifest.manifest_version > original_manifest_version;
194        let _ = sender.send(Ok((manifest.manifest_version, updated)));
195    }
196}
197
198impl<S> RegionWorkerLoop<S> {
199    pub(crate) async fn handle_region_edit(&mut self, request: RegionEditRequest) {
201        let region_id = request.region_id;
202        let Some(region) = self.regions.get_region(region_id) else {
203            let _ = request.tx.send(RegionNotFoundSnafu { region_id }.fail());
204            return;
205        };
206
207        if !region.is_writable() {
208            if region.state() == RegionRoleState::Leader(RegionLeaderState::Editing) {
209                self.region_edit_queues
210                    .entry(region_id)
211                    .or_insert_with(|| RegionEditQueue::new(region_id))
212                    .enqueue(request);
213            } else {
214                let _ = request.tx.send(RegionBusySnafu { region_id }.fail());
215            }
216            return;
217        }
218
219        let RegionEditRequest {
220            region_id: _,
221            mut edit,
222            tx: sender,
223        } = request;
224        let file_sequence = region.version_control.committed_sequence() + 1;
225        edit.committed_sequence = Some(file_sequence);
226
227        for file in &mut edit.files_to_add {
229            file.sequence = NonZeroU64::new(file_sequence);
230        }
231
232        if let Err(e) = region.set_editing() {
234            let _ = sender.send(Err(e));
235            return;
236        }
237
238        let request_sender = self.sender.clone();
239        let cache_manager = self.cache_manager.clone();
240        let listener = self.listener.clone();
241        common_runtime::spawn_global(async move {
244            let result = edit_region(®ion, edit.clone(), cache_manager, listener).await;
245            let notify = WorkerRequest::Background {
246                region_id,
247                notify: BackgroundNotify::RegionEdit(RegionEditResult {
248                    region_id,
249                    sender,
250                    edit,
251                    result,
252                }),
253            };
254
255            if let Err(res) = request_sender
257                .send(WorkerRequestWithTime::new(notify))
258                .await
259            {
260                warn!(
261                    "Failed to send region edit result back to the worker, region_id: {}, res: {:?}",
262                    region_id, res
263                );
264            }
265        });
266    }
267
268    pub(crate) async fn handle_region_edit_result(&mut self, edit_result: RegionEditResult) {
270        let region = match self.regions.get_region(edit_result.region_id) {
271            Some(region) => region,
272            None => {
273                let _ = edit_result.sender.send(
274                    RegionNotFoundSnafu {
275                        region_id: edit_result.region_id,
276                    }
277                    .fail(),
278                );
279                return;
280            }
281        };
282
283        let need_compaction =
284            edit_result.result.is_ok() && !edit_result.edit.files_to_add.is_empty();
285
286        if edit_result.result.is_ok() {
287            region.version_control.apply_edit(
289                Some(edit_result.edit),
290                &[],
291                region.file_purger.clone(),
292            );
293        }
294
295        region.switch_state_to_writable(RegionLeaderState::Editing);
297
298        let _ = edit_result.sender.send(edit_result.result);
299
300        if let Some(edit_queue) = self.region_edit_queues.get_mut(&edit_result.region_id)
301            && let Some(request) = edit_queue.dequeue()
302        {
303            self.handle_region_edit(request).await;
304        }
305
306        if need_compaction {
307            self.schedule_compaction(®ion).await;
308        }
309    }
310
311    pub(crate) fn handle_manifest_truncate_action(
313        &self,
314        region: MitoRegionRef,
315        truncate: RegionTruncate,
316        sender: OptionOutputTx,
317    ) {
318        if let Err(e) = region.set_truncating() {
321            sender.send(Err(e));
322            return;
323        }
324        let request_sender = self.sender.clone();
327        let manifest_ctx = region.manifest_ctx.clone();
328
329        common_runtime::spawn_global(async move {
331            let action_list =
333                RegionMetaActionList::with_action(RegionMetaAction::Truncate(truncate.clone()));
334
335            let result = manifest_ctx
336                .update_manifest(RegionLeaderState::Truncating, action_list)
337                .await
338                .map(|_| ());
339
340            let truncate_result = TruncateResult {
342                region_id: truncate.region_id,
343                sender,
344                result,
345                kind: truncate.kind,
346            };
347            let _ = request_sender
348                .send(WorkerRequestWithTime::new(WorkerRequest::Background {
349                    region_id: truncate.region_id,
350                    notify: BackgroundNotify::Truncate(truncate_result),
351                }))
352                .await
353                .inspect_err(|_| warn!("failed to send truncate result"));
354        });
355    }
356
357    pub(crate) fn handle_manifest_region_change(
359        &self,
360        region: MitoRegionRef,
361        change: RegionChange,
362        need_index: bool,
363        sender: OptionOutputTx,
364    ) {
365        if let Err(e) = region.set_altering() {
367            sender.send(Err(e));
368            return;
369        }
370        let listener = self.listener.clone();
371        let request_sender = self.sender.clone();
372        common_runtime::spawn_global(async move {
374            let new_meta = change.metadata.clone();
375            let action_list = RegionMetaActionList::with_action(RegionMetaAction::Change(change));
376
377            let result = region
378                .manifest_ctx
379                .update_manifest(RegionLeaderState::Altering, action_list)
380                .await
381                .map(|_| ());
382            let notify = WorkerRequest::Background {
383                region_id: region.region_id,
384                notify: BackgroundNotify::RegionChange(RegionChangeResult {
385                    region_id: region.region_id,
386                    sender,
387                    result,
388                    new_meta,
389                    need_index,
390                }),
391            };
392            listener
393                .on_notify_region_change_result_begin(region.region_id)
394                .await;
395
396            if let Err(res) = request_sender
397                .send(WorkerRequestWithTime::new(notify))
398                .await
399            {
400                warn!(
401                    "Failed to send region change result back to the worker, region_id: {}, res: {:?}",
402                    region.region_id, res
403                );
404            }
405        });
406    }
407}
408
409async fn edit_region(
411    region: &MitoRegionRef,
412    edit: RegionEdit,
413    cache_manager: CacheManagerRef,
414    listener: WorkerListener,
415) -> Result<()> {
416    let region_id = region.region_id;
417    if let Some(write_cache) = cache_manager.write_cache() {
418        for file_meta in &edit.files_to_add {
419            let write_cache = write_cache.clone();
420            let layer = region.access_layer.clone();
421            let listener = listener.clone();
422
423            let index_key = IndexKey::new(region_id, file_meta.file_id, FileType::Parquet);
424            let remote_path =
425                location::sst_file_path(layer.table_dir(), file_meta.file_id(), layer.path_type());
426
427            let is_index_exist = file_meta.exists_index();
428            let index_file_size = file_meta.index_file_size();
429
430            let index_file_index_key =
431                IndexKey::new(region_id, file_meta.file_id, FileType::Puffin);
432            let index_remote_path = location::index_file_path(
433                layer.table_dir(),
434                file_meta.file_id(),
435                layer.path_type(),
436            );
437
438            let file_size = file_meta.file_size;
439            common_runtime::spawn_global(async move {
440                WRITE_CACHE_INFLIGHT_DOWNLOAD.add(1);
441
442                if write_cache
443                    .download(index_key, &remote_path, layer.object_store(), file_size)
444                    .await
445                    .is_ok()
446                {
447                    let _ = write_cache
450                        .file_cache()
451                        .get_parquet_meta_data(index_key)
452                        .await;
453
454                    listener.on_file_cache_filled(index_key.file_id);
455                }
456                if is_index_exist {
457                    if let Err(err) = write_cache
459                        .download(
460                            index_file_index_key,
461                            &index_remote_path,
462                            layer.object_store(),
463                            index_file_size,
464                        )
465                        .await
466                    {
467                        common_telemetry::error!(
468                            err; "Failed to download puffin file, region_id: {}, index_file_index_key: {:?}, index_remote_path: {}", region_id, index_file_index_key, index_remote_path
469                        );
470                    }
471                }
472
473                WRITE_CACHE_INFLIGHT_DOWNLOAD.sub(1);
474            });
475        }
476    }
477
478    info!("Applying {edit:?} to region {}", region_id);
479
480    let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit));
481    region
482        .manifest_ctx
483        .update_manifest(RegionLeaderState::Editing, action_list)
484        .await
485        .map(|_| ())
486}