1use std::collections::{HashMap, VecDeque};
20use std::num::NonZeroU64;
21use std::sync::Arc;
22
23use common_telemetry::{debug, info, warn};
24use parquet::file::metadata::PageIndexPolicy;
25use snafu::ResultExt;
26use store_api::logstore::LogStore;
27use store_api::metadata::RegionMetadataRef;
28use store_api::storage::RegionId;
29
30use crate::cache::CacheManagerRef;
31use crate::cache::file_cache::{FileType, IndexKey};
32use crate::config::IndexBuildMode;
33use crate::error::{EditRegionSnafu, RegionBusySnafu, RegionNotFoundSnafu, Result};
34use crate::manifest::action::{
35 RegionChange, RegionEdit, RegionMetaAction, RegionMetaActionList, RegionTruncate,
36};
37use crate::memtable::MemtableBuilderProvider;
38use crate::metrics::WRITE_CACHE_INFLIGHT_DOWNLOAD;
39use crate::region::opener::{sanitize_region_options, version_builder_from_manifest};
40use crate::region::options::RegionOptions;
41use crate::region::version::VersionControlRef;
42use crate::region::{MitoRegionRef, RegionLeaderState, RegionRoleState};
43use crate::request::{
44 BackgroundNotify, BuildIndexRequest, OptionOutputTx, RegionChangeResult, RegionEditRequest,
45 RegionEditResult, RegionSyncRequest, TruncateResult, WorkerRequest, WorkerRequestWithTime,
46};
47use crate::sst::index::IndexBuildType;
48use crate::sst::location;
49use crate::worker::{RegionWorkerLoop, WorkerListener};
50
51pub(crate) type RegionEditQueues = HashMap<RegionId, RegionEditQueue>;
52
53pub(crate) struct RegionEditQueue {
61 region_id: RegionId,
62 requests: VecDeque<RegionEditRequest>,
63}
64
65impl RegionEditQueue {
66 const QUEUE_MAX_LEN: usize = 128;
67
68 fn new(region_id: RegionId) -> Self {
69 Self {
70 region_id,
71 requests: VecDeque::new(),
72 }
73 }
74
75 fn enqueue(&mut self, request: RegionEditRequest) {
76 if self.requests.len() > Self::QUEUE_MAX_LEN {
77 request.waiters.reply_with(|| {
78 RegionBusySnafu {
79 region_id: self.region_id,
80 }
81 .fail()
82 });
83 return;
84 };
85 self.requests.push_back(request);
86 }
87
88 fn dequeue(&mut self) -> Option<RegionEditRequest> {
89 fn can_merge(edit: &RegionEdit) -> bool {
90 edit.files_to_add.iter().all(|f| f.sequence.is_none())
100 && edit.files_to_remove.is_empty()
101 && edit.timestamp_ms.is_none()
102 && edit.compaction_time_window.is_none()
103 && edit.flushed_entry_id.is_none()
104 && edit.flushed_sequence.is_none()
105 && edit.committed_sequence.is_none()
106 }
107
108 let mut merged = self.requests.pop_front()?;
109 if !can_merge(&merged.edit) {
110 return Some(merged);
111 }
112
113 while let Some(request) = self
114 .requests
115 .pop_front_if(|request| can_merge(&request.edit))
116 {
117 merged.edit.files_to_add.extend(request.edit.files_to_add);
118 merged.waiters.merge(request.waiters);
119 }
120 debug!(
121 "the files to add: [{}] are merged in one edit",
122 merged
123 .edit
124 .files_to_add
125 .iter()
126 .map(|x| x.file_id.to_string())
127 .collect::<Vec<_>>()
128 .join(", ")
129 );
130 Some(merged)
131 }
132
133 fn is_empty(&self) -> bool {
134 self.requests.is_empty()
135 }
136
137 fn reject_all_as_not_found(mut self) {
138 while let Some(request) = self.requests.pop_front() {
139 request.waiters.reply_with(|| {
140 RegionNotFoundSnafu {
141 region_id: self.region_id,
142 }
143 .fail()
144 });
145 }
146 }
147}
148
149impl<S: LogStore> RegionWorkerLoop<S> {
150 pub(crate) fn reject_region_edit_queue_as_not_found(&mut self, region_id: RegionId) {
152 if let Some(edit_queue) = self.region_edit_queues.remove(®ion_id) {
153 edit_queue.reject_all_as_not_found();
154 }
155 }
156
157 pub(crate) async fn handle_manifest_region_change_result(
159 &mut self,
160 change_result: RegionChangeResult,
161 ) {
162 let region = match self.regions.get_region(change_result.region_id) {
163 Some(region) => region,
164 None => {
165 self.reject_region_stalled_requests(&change_result.region_id);
166 change_result.sender.send(
167 RegionNotFoundSnafu {
168 region_id: change_result.region_id,
169 }
170 .fail(),
171 );
172 return;
173 }
174 };
175
176 if change_result.result.is_ok() {
177 Self::update_region_version(
179 ®ion.version_control,
180 change_result.new_meta,
181 change_result.new_options,
182 &self.memtable_builder_provider,
183 );
184 }
185
186 region.switch_state_to_writable(RegionLeaderState::Altering);
188 change_result.sender.send(change_result.result.map(|_| 0));
190
191 if self.config.index.build_mode == IndexBuildMode::Async && change_result.need_index {
193 self.handle_rebuild_index(
194 BuildIndexRequest {
195 region_id: region.region_id,
196 build_type: IndexBuildType::SchemaChange,
197 file_metas: Vec::new(),
198 },
199 OptionOutputTx::new(None),
200 )
201 .await;
202 }
203 self.handle_region_stalled_requests(&change_result.region_id, true)
205 .await;
206 }
207
208 pub(crate) async fn handle_region_sync(&mut self, request: RegionSyncRequest) {
213 let region_id = request.region_id;
214 let sender = request.sender;
215 let region = match self.regions.follower_region(region_id) {
216 Ok(region) => region,
217 Err(e) => {
218 let _ = sender.send(Err(e));
219 return;
220 }
221 };
222
223 let original_manifest_version = region.manifest_ctx.manifest_version().await;
224 let manifest = match region
225 .manifest_ctx
226 .install_manifest_to(request.manifest_version)
227 .await
228 {
229 Ok(manifest) => manifest,
230 Err(e) => {
231 let _ = sender.send(Err(e));
232 return;
233 }
234 };
235 let version = region.version();
236 let mut region_options = version.options.clone();
237 let old_format = region_options.sst_format.unwrap_or_default();
238 sanitize_region_options(&manifest, &mut region_options);
240 if !version.memtables.is_empty() {
241 let current = region.version_control.current();
242 warn!(
243 "Region {} memtables is not empty, which should not happen, manifest version: {}, last entry id: {}",
244 region.region_id, manifest.manifest_version, current.last_entry_id
245 );
246 }
247
248 let memtable_builder = if old_format != region_options.sst_format.unwrap_or_default() {
250 Some(
252 self.memtable_builder_provider
253 .builder_for_options(®ion_options),
254 )
255 } else {
256 None
257 };
258 let new_mutable = Arc::new(
259 region
260 .version()
261 .memtables
262 .mutable
263 .new_with_part_duration(version.compaction_time_window, memtable_builder),
264 );
265 let metadata = manifest.metadata.clone();
267
268 let version_builder = version_builder_from_manifest(
269 &manifest,
270 metadata,
271 region.file_purger.clone(),
272 new_mutable,
273 region_options,
274 );
275 let version = version_builder.build();
276 region.version_control.overwrite_current(Arc::new(version));
277
278 let updated = manifest.manifest_version > original_manifest_version;
279 let _ = sender.send(Ok((manifest.manifest_version, updated)));
280 }
281}
282
283impl<S: LogStore> RegionWorkerLoop<S> {
284 pub(crate) fn handle_region_edit(&mut self, request: RegionEditRequest) {
286 let region_id = request.region_id;
287 let Some(region) = self.regions.get_region(region_id) else {
288 request
289 .waiters
290 .reply_with(|| RegionNotFoundSnafu { region_id }.fail());
291 return;
292 };
293
294 if !region.is_writable() {
295 if region.state() == RegionRoleState::Leader(RegionLeaderState::Editing) {
296 self.region_edit_queues
297 .entry(region_id)
298 .or_insert_with(|| RegionEditQueue::new(region_id))
299 .enqueue(request);
300 } else {
301 request
302 .waiters
303 .reply_with(|| RegionBusySnafu { region_id }.fail());
304 }
305 return;
306 }
307
308 let RegionEditRequest {
309 region_id: _,
310 mut edit,
311 waiters,
312 preload_sst_cache,
313 } = request;
314 let file_sequence = region.version_control.committed_sequence() + 1;
315 edit.committed_sequence = Some(file_sequence);
316
317 for file in &mut edit.files_to_add {
319 file.sequence = NonZeroU64::new(file_sequence);
320 }
321
322 let is_staging = region.is_staging();
324 let expect_state = if is_staging {
325 RegionLeaderState::Staging
326 } else {
327 RegionLeaderState::Writable
328 };
329 if let Err(e) = region.set_editing(expect_state) {
331 let e = Arc::new(e);
332 waiters.reply_with(|| Err(e.clone()).context(EditRegionSnafu { region_id }));
333 return;
334 }
335
336 let request_sender = self.sender.clone();
337 let cache_manager = self.cache_manager.clone();
338 let listener = self.listener.clone();
339 common_runtime::spawn_global(async move {
342 let result = edit_region(
343 ®ion,
344 edit.clone(),
345 cache_manager,
346 listener,
347 is_staging,
348 preload_sst_cache,
349 )
350 .await
351 .map_err(Arc::new);
352 let notify = WorkerRequest::Background {
353 region_id,
354 notify: BackgroundNotify::RegionEdit(RegionEditResult {
355 region_id,
356 waiters,
357 edit,
358 result,
359 update_region_state: true,
361 is_staging,
362 }),
363 };
364
365 if let Err(res) = request_sender
367 .send(WorkerRequestWithTime::new(notify))
368 .await
369 {
370 warn!(
371 "Failed to send region edit result back to the worker, region_id: {}, res: {:?}",
372 region_id, res
373 );
374 }
375 });
376 }
377
378 pub(crate) async fn handle_region_edit_result(&mut self, edit_result: RegionEditResult) {
380 let region = match self.regions.get_region(edit_result.region_id) {
381 Some(region) => region,
382 None => {
383 self.fail_region_stalled_requests_as_not_found(&edit_result.region_id);
386 self.reject_region_edit_queue_as_not_found(edit_result.region_id);
387
388 edit_result.waiters.reply_with(|| {
389 RegionNotFoundSnafu {
390 region_id: edit_result.region_id,
391 }
392 .fail()
393 });
394 return;
395 }
396 };
397
398 let need_compaction = if edit_result.is_staging {
399 if edit_result.update_region_state {
400 region.switch_state_to_staging(RegionLeaderState::Editing);
403 }
404
405 false
406 } else {
407 let need_compaction = self.config.schedule_compaction_after_edit
408 && edit_result.result.is_ok()
409 && !edit_result.edit.files_to_add.is_empty();
410
411 if edit_result.result.is_ok() {
413 region.version_control.apply_edit(
415 Some(edit_result.edit),
416 &[],
417 region.file_purger.clone(),
418 );
419 }
420 if edit_result.update_region_state {
421 region.switch_state_to_writable(RegionLeaderState::Editing);
422 }
423
424 need_compaction
425 };
426
427 edit_result
428 .waiters
429 .reply_with(|| match &edit_result.result {
430 Ok(()) => Ok(()),
431 Err(e) => Err(e.clone()).context(EditRegionSnafu {
432 region_id: edit_result.region_id,
433 }),
434 });
435
436 if edit_result.update_region_state {
437 self.handle_region_stalled_requests(&edit_result.region_id, false)
440 .await;
441 }
442
443 let next_request =
444 if let Some(edit_queue) = self.region_edit_queues.get_mut(&edit_result.region_id) {
445 let request = edit_queue.dequeue();
446 if edit_queue.is_empty() {
447 self.region_edit_queues.remove(&edit_result.region_id);
448 }
449 request
450 } else {
451 None
452 };
453 if let Some(request) = next_request {
454 self.handle_region_edit(request);
455 }
456
457 if need_compaction {
458 self.schedule_compaction(®ion).await;
459 }
460 }
461
462 pub(crate) fn handle_manifest_truncate_action(
464 &self,
465 region: MitoRegionRef,
466 truncate: RegionTruncate,
467 sender: OptionOutputTx,
468 ) {
469 if let Err(e) = region.set_truncating() {
472 sender.send(Err(e));
473 return;
474 }
475 let request_sender = self.sender.clone();
478 let manifest_ctx = region.manifest_ctx.clone();
479 let is_staging = region.is_staging();
480
481 common_runtime::spawn_global(async move {
483 let action_list =
485 RegionMetaActionList::with_action(RegionMetaAction::Truncate(truncate.clone()));
486
487 let result = manifest_ctx
488 .update_manifest(RegionLeaderState::Truncating, action_list, is_staging)
489 .await
490 .map(|_| ());
491
492 let truncate_result = TruncateResult {
494 region_id: truncate.region_id,
495 sender,
496 result,
497 kind: truncate.kind,
498 };
499 let _ = request_sender
500 .send(WorkerRequestWithTime::new(WorkerRequest::Background {
501 region_id: truncate.region_id,
502 notify: BackgroundNotify::Truncate(truncate_result),
503 }))
504 .await
505 .inspect_err(|_| warn!("failed to send truncate result"));
506 });
507 }
508
509 pub(crate) fn handle_manifest_region_change(
511 &self,
512 region: MitoRegionRef,
513 change: RegionChange,
514 need_index: bool,
515 new_options: Option<RegionOptions>,
516 sender: OptionOutputTx,
517 ) {
518 if let Err(e) = region.set_altering() {
520 sender.send(Err(e));
521 return;
522 }
523 let listener = self.listener.clone();
524 let request_sender = self.sender.clone();
525 let is_staging = region.is_staging();
526 common_runtime::spawn_global(async move {
528 let new_meta = change.metadata.clone();
529 let action_list = RegionMetaActionList::with_action(RegionMetaAction::Change(change));
530
531 let result = region
532 .manifest_ctx
533 .update_manifest(RegionLeaderState::Altering, action_list, is_staging)
534 .await
535 .map(|_| ());
536 let notify = WorkerRequest::Background {
537 region_id: region.region_id,
538 notify: BackgroundNotify::RegionChange(RegionChangeResult {
539 region_id: region.region_id,
540 sender,
541 result,
542 new_meta,
543 need_index,
544 new_options,
545 }),
546 };
547 listener
548 .on_notify_region_change_result_begin(region.region_id)
549 .await;
550
551 if let Err(res) = request_sender
552 .send(WorkerRequestWithTime::new(notify))
553 .await
554 {
555 warn!(
556 "Failed to send region change result back to the worker, region_id: {}, res: {:?}",
557 region.region_id, res
558 );
559 }
560 });
561 }
562
563 fn update_region_version(
564 version_control: &VersionControlRef,
565 new_meta: RegionMetadataRef,
566 new_options: Option<RegionOptions>,
567 memtable_builder_provider: &MemtableBuilderProvider,
568 ) {
569 let options_changed = new_options.is_some();
570 let region_id = new_meta.region_id;
571 if let Some(new_options) = new_options {
572 let new_memtable_builder = memtable_builder_provider.builder_for_options(&new_options);
575 version_control.alter_schema_and_format(new_meta, new_options, new_memtable_builder);
576 } else {
577 version_control.alter_schema(new_meta);
579 }
580
581 let version_data = version_control.current();
582 let version = version_data.version;
583 info!(
584 "Region {} is altered, metadata is {:?}, options: {:?}, options_changed: {}",
585 region_id, version.metadata, version.options, options_changed,
586 );
587 }
588}
589
590async fn edit_region(
592 region: &MitoRegionRef,
593 edit: RegionEdit,
594 cache_manager: CacheManagerRef,
595 listener: WorkerListener,
596 is_staging: bool,
597 preload_sst_cache: bool,
598) -> Result<()> {
599 let region_id = region.region_id;
600 if let Some(write_cache) = cache_manager.write_cache()
601 && preload_sst_cache
602 {
603 for file_meta in &edit.files_to_add {
604 let write_cache = write_cache.clone();
605 let layer = region.access_layer.clone();
606 let listener = listener.clone();
607
608 let index_key = IndexKey::new(region_id, file_meta.file_id, FileType::Parquet);
609 let remote_path =
610 location::sst_file_path(layer.table_dir(), file_meta.file_id(), layer.path_type());
611
612 let is_index_exist = file_meta.exists_index();
613 let index_file_size = file_meta.index_file_size();
614
615 let index_file_index_key = IndexKey::new(
616 region_id,
617 file_meta.index_id().file_id.file_id(),
618 FileType::Puffin(file_meta.index_version),
619 );
620 let index_remote_path = location::index_file_path(
621 layer.table_dir(),
622 file_meta.index_id(),
623 layer.path_type(),
624 );
625
626 let file_size = file_meta.file_size;
627 common_runtime::spawn_global(async move {
628 WRITE_CACHE_INFLIGHT_DOWNLOAD.add(1);
629
630 let parquet_cached = write_cache
631 .download_if_absent(index_key, &remote_path, layer.object_store(), file_size)
632 .await;
633
634 if parquet_cached.is_ok() {
635 let mut cache_metrics = Default::default();
638 let _ = write_cache
639 .file_cache()
640 .get_parquet_meta_data(
641 index_key,
642 &mut cache_metrics,
643 PageIndexPolicy::Optional,
644 )
645 .await;
646
647 if matches!(parquet_cached, Ok(true)) {
648 listener.on_file_cache_filled(index_key.file_id);
649 }
650 }
651 if is_index_exist {
652 if let Err(err) = write_cache
654 .download(
655 index_file_index_key,
656 &index_remote_path,
657 layer.object_store(),
658 index_file_size,
659 )
660 .await
661 {
662 common_telemetry::error!(
663 err; "Failed to download puffin file, region_id: {}, index_file_index_key: {:?}, index_remote_path: {}", region_id, index_file_index_key, index_remote_path
664 );
665 }
666 }
667
668 WRITE_CACHE_INFLIGHT_DOWNLOAD.sub(1);
669 });
670 }
671 }
672
673 info!(
674 "Applying {edit:?} to region {}, is_staging: {}",
675 region_id, is_staging
676 );
677
678 let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit));
679 region
680 .manifest_ctx
681 .update_manifest(RegionLeaderState::Editing, action_list, is_staging)
682 .await
683 .map(|_| ())
684}