1use std::collections::{HashMap, VecDeque};
20use std::num::NonZeroU64;
21use std::sync::Arc;
22
23use common_telemetry::{info, warn};
24use parquet::file::metadata::PageIndexPolicy;
25use store_api::logstore::LogStore;
26use store_api::metadata::RegionMetadataRef;
27use store_api::storage::RegionId;
28
29use crate::cache::CacheManagerRef;
30use crate::cache::file_cache::{FileType, IndexKey};
31use crate::config::IndexBuildMode;
32use crate::error::{RegionBusySnafu, RegionNotFoundSnafu, Result};
33use crate::manifest::action::{
34 RegionChange, RegionEdit, RegionMetaAction, RegionMetaActionList, RegionTruncate,
35};
36use crate::memtable::MemtableBuilderProvider;
37use crate::metrics::WRITE_CACHE_INFLIGHT_DOWNLOAD;
38use crate::region::opener::{sanitize_region_options, version_builder_from_manifest};
39use crate::region::options::RegionOptions;
40use crate::region::version::VersionControlRef;
41use crate::region::{MitoRegionRef, RegionLeaderState, RegionRoleState};
42use crate::request::{
43 BackgroundNotify, BuildIndexRequest, OptionOutputTx, RegionChangeResult, RegionEditRequest,
44 RegionEditResult, RegionSyncRequest, TruncateResult, WorkerRequest, WorkerRequestWithTime,
45};
46use crate::sst::index::IndexBuildType;
47use crate::sst::location;
48use crate::worker::{RegionWorkerLoop, WorkerListener};
49
50pub(crate) type RegionEditQueues = HashMap<RegionId, RegionEditQueue>;
51
52pub(crate) struct RegionEditQueue {
60 region_id: RegionId,
61 requests: VecDeque<RegionEditRequest>,
62}
63
64impl RegionEditQueue {
65 const QUEUE_MAX_LEN: usize = 128;
66
67 fn new(region_id: RegionId) -> Self {
68 Self {
69 region_id,
70 requests: VecDeque::new(),
71 }
72 }
73
74 fn enqueue(&mut self, request: RegionEditRequest) {
75 if self.requests.len() > Self::QUEUE_MAX_LEN {
76 let _ = request.tx.send(
77 RegionBusySnafu {
78 region_id: self.region_id,
79 }
80 .fail(),
81 );
82 return;
83 };
84 self.requests.push_back(request);
85 }
86
87 fn dequeue(&mut self) -> Option<RegionEditRequest> {
88 self.requests.pop_front()
89 }
90
91 fn is_empty(&self) -> bool {
92 self.requests.is_empty()
93 }
94
95 fn reject_all_as_not_found(mut self) {
96 while let Some(request) = self.requests.pop_front() {
97 let _ = request.tx.send(
98 RegionNotFoundSnafu {
99 region_id: self.region_id,
100 }
101 .fail(),
102 );
103 }
104 }
105}
106
107impl<S: LogStore> RegionWorkerLoop<S> {
108 pub(crate) fn reject_region_edit_queue_as_not_found(&mut self, region_id: RegionId) {
110 if let Some(edit_queue) = self.region_edit_queues.remove(®ion_id) {
111 edit_queue.reject_all_as_not_found();
112 }
113 }
114
115 pub(crate) async fn handle_manifest_region_change_result(
117 &mut self,
118 change_result: RegionChangeResult,
119 ) {
120 let region = match self.regions.get_region(change_result.region_id) {
121 Some(region) => region,
122 None => {
123 self.reject_region_stalled_requests(&change_result.region_id);
124 change_result.sender.send(
125 RegionNotFoundSnafu {
126 region_id: change_result.region_id,
127 }
128 .fail(),
129 );
130 return;
131 }
132 };
133
134 if change_result.result.is_ok() {
135 Self::update_region_version(
137 ®ion.version_control,
138 change_result.new_meta,
139 change_result.new_options,
140 &self.memtable_builder_provider,
141 );
142 }
143
144 region.switch_state_to_writable(RegionLeaderState::Altering);
146 change_result.sender.send(change_result.result.map(|_| 0));
148
149 if self.config.index.build_mode == IndexBuildMode::Async && change_result.need_index {
151 self.handle_rebuild_index(
152 BuildIndexRequest {
153 region_id: region.region_id,
154 build_type: IndexBuildType::SchemaChange,
155 file_metas: Vec::new(),
156 },
157 OptionOutputTx::new(None),
158 )
159 .await;
160 }
161 self.handle_region_stalled_requests(&change_result.region_id, true)
163 .await;
164 }
165
166 pub(crate) async fn handle_region_sync(&mut self, request: RegionSyncRequest) {
171 let region_id = request.region_id;
172 let sender = request.sender;
173 let region = match self.regions.follower_region(region_id) {
174 Ok(region) => region,
175 Err(e) => {
176 let _ = sender.send(Err(e));
177 return;
178 }
179 };
180
181 let original_manifest_version = region.manifest_ctx.manifest_version().await;
182 let manifest = match region
183 .manifest_ctx
184 .install_manifest_to(request.manifest_version)
185 .await
186 {
187 Ok(manifest) => manifest,
188 Err(e) => {
189 let _ = sender.send(Err(e));
190 return;
191 }
192 };
193 let version = region.version();
194 let mut region_options = version.options.clone();
195 let old_format = region_options.sst_format.unwrap_or_default();
196 sanitize_region_options(&manifest, &mut region_options);
198 if !version.memtables.is_empty() {
199 let current = region.version_control.current();
200 warn!(
201 "Region {} memtables is not empty, which should not happen, manifest version: {}, last entry id: {}",
202 region.region_id, manifest.manifest_version, current.last_entry_id
203 );
204 }
205
206 let memtable_builder = if old_format != region_options.sst_format.unwrap_or_default() {
208 Some(
210 self.memtable_builder_provider
211 .builder_for_options(®ion_options),
212 )
213 } else {
214 None
215 };
216 let new_mutable = Arc::new(
217 region
218 .version()
219 .memtables
220 .mutable
221 .new_with_part_duration(version.compaction_time_window, memtable_builder),
222 );
223 let metadata = manifest.metadata.clone();
225
226 let version_builder = version_builder_from_manifest(
227 &manifest,
228 metadata,
229 region.file_purger.clone(),
230 new_mutable,
231 region_options,
232 );
233 let version = version_builder.build();
234 region.version_control.overwrite_current(Arc::new(version));
235
236 let updated = manifest.manifest_version > original_manifest_version;
237 let _ = sender.send(Ok((manifest.manifest_version, updated)));
238 }
239}
240
241impl<S: LogStore> RegionWorkerLoop<S> {
242 pub(crate) fn handle_region_edit(&mut self, request: RegionEditRequest) {
244 let region_id = request.region_id;
245 let Some(region) = self.regions.get_region(region_id) else {
246 let _ = request.tx.send(RegionNotFoundSnafu { region_id }.fail());
247 return;
248 };
249
250 if !region.is_writable() {
251 if region.state() == RegionRoleState::Leader(RegionLeaderState::Editing) {
252 self.region_edit_queues
253 .entry(region_id)
254 .or_insert_with(|| RegionEditQueue::new(region_id))
255 .enqueue(request);
256 } else {
257 let _ = request.tx.send(RegionBusySnafu { region_id }.fail());
258 }
259 return;
260 }
261
262 let RegionEditRequest {
263 region_id: _,
264 mut edit,
265 tx: sender,
266 } = request;
267 let file_sequence = region.version_control.committed_sequence() + 1;
268 edit.committed_sequence = Some(file_sequence);
269
270 for file in &mut edit.files_to_add {
272 file.sequence = NonZeroU64::new(file_sequence);
273 }
274
275 let is_staging = region.is_staging();
277 let expect_state = if is_staging {
278 RegionLeaderState::Staging
279 } else {
280 RegionLeaderState::Writable
281 };
282 if let Err(e) = region.set_editing(expect_state) {
284 let _ = sender.send(Err(e));
285 return;
286 }
287
288 let request_sender = self.sender.clone();
289 let cache_manager = self.cache_manager.clone();
290 let listener = self.listener.clone();
291 common_runtime::spawn_global(async move {
294 let result =
295 edit_region(®ion, edit.clone(), cache_manager, listener, is_staging).await;
296 let notify = WorkerRequest::Background {
297 region_id,
298 notify: BackgroundNotify::RegionEdit(RegionEditResult {
299 region_id,
300 sender,
301 edit,
302 result,
303 update_region_state: true,
305 is_staging,
306 }),
307 };
308
309 if let Err(res) = request_sender
311 .send(WorkerRequestWithTime::new(notify))
312 .await
313 {
314 warn!(
315 "Failed to send region edit result back to the worker, region_id: {}, res: {:?}",
316 region_id, res
317 );
318 }
319 });
320 }
321
322 pub(crate) async fn handle_region_edit_result(&mut self, edit_result: RegionEditResult) {
324 let region = match self.regions.get_region(edit_result.region_id) {
325 Some(region) => region,
326 None => {
327 self.fail_region_stalled_requests_as_not_found(&edit_result.region_id);
330 self.reject_region_edit_queue_as_not_found(edit_result.region_id);
331 let _ = edit_result.sender.send(
332 RegionNotFoundSnafu {
333 region_id: edit_result.region_id,
334 }
335 .fail(),
336 );
337 return;
338 }
339 };
340
341 let need_compaction = if edit_result.is_staging {
342 if edit_result.update_region_state {
343 region.switch_state_to_staging(RegionLeaderState::Editing);
346 }
347
348 false
349 } else {
350 let need_compaction =
351 edit_result.result.is_ok() && !edit_result.edit.files_to_add.is_empty();
352 if edit_result.result.is_ok() {
354 region.version_control.apply_edit(
356 Some(edit_result.edit),
357 &[],
358 region.file_purger.clone(),
359 );
360 }
361 if edit_result.update_region_state {
362 region.switch_state_to_writable(RegionLeaderState::Editing);
363 }
364
365 need_compaction
366 };
367
368 let _ = edit_result.sender.send(edit_result.result);
369
370 if edit_result.update_region_state {
371 self.handle_region_stalled_requests(&edit_result.region_id, false)
374 .await;
375 }
376
377 let next_request =
378 if let Some(edit_queue) = self.region_edit_queues.get_mut(&edit_result.region_id) {
379 let request = edit_queue.dequeue();
380 if edit_queue.is_empty() {
381 self.region_edit_queues.remove(&edit_result.region_id);
382 }
383 request
384 } else {
385 None
386 };
387 if let Some(request) = next_request {
388 self.handle_region_edit(request);
389 }
390
391 if need_compaction {
392 self.schedule_compaction(®ion).await;
393 }
394 }
395
396 pub(crate) fn handle_manifest_truncate_action(
398 &self,
399 region: MitoRegionRef,
400 truncate: RegionTruncate,
401 sender: OptionOutputTx,
402 ) {
403 if let Err(e) = region.set_truncating() {
406 sender.send(Err(e));
407 return;
408 }
409 let request_sender = self.sender.clone();
412 let manifest_ctx = region.manifest_ctx.clone();
413 let is_staging = region.is_staging();
414
415 common_runtime::spawn_global(async move {
417 let action_list =
419 RegionMetaActionList::with_action(RegionMetaAction::Truncate(truncate.clone()));
420
421 let result = manifest_ctx
422 .update_manifest(RegionLeaderState::Truncating, action_list, is_staging)
423 .await
424 .map(|_| ());
425
426 let truncate_result = TruncateResult {
428 region_id: truncate.region_id,
429 sender,
430 result,
431 kind: truncate.kind,
432 };
433 let _ = request_sender
434 .send(WorkerRequestWithTime::new(WorkerRequest::Background {
435 region_id: truncate.region_id,
436 notify: BackgroundNotify::Truncate(truncate_result),
437 }))
438 .await
439 .inspect_err(|_| warn!("failed to send truncate result"));
440 });
441 }
442
443 pub(crate) fn handle_manifest_region_change(
445 &self,
446 region: MitoRegionRef,
447 change: RegionChange,
448 need_index: bool,
449 new_options: Option<RegionOptions>,
450 sender: OptionOutputTx,
451 ) {
452 if let Err(e) = region.set_altering() {
454 sender.send(Err(e));
455 return;
456 }
457 let listener = self.listener.clone();
458 let request_sender = self.sender.clone();
459 let is_staging = region.is_staging();
460 common_runtime::spawn_global(async move {
462 let new_meta = change.metadata.clone();
463 let action_list = RegionMetaActionList::with_action(RegionMetaAction::Change(change));
464
465 let result = region
466 .manifest_ctx
467 .update_manifest(RegionLeaderState::Altering, action_list, is_staging)
468 .await
469 .map(|_| ());
470 let notify = WorkerRequest::Background {
471 region_id: region.region_id,
472 notify: BackgroundNotify::RegionChange(RegionChangeResult {
473 region_id: region.region_id,
474 sender,
475 result,
476 new_meta,
477 need_index,
478 new_options,
479 }),
480 };
481 listener
482 .on_notify_region_change_result_begin(region.region_id)
483 .await;
484
485 if let Err(res) = request_sender
486 .send(WorkerRequestWithTime::new(notify))
487 .await
488 {
489 warn!(
490 "Failed to send region change result back to the worker, region_id: {}, res: {:?}",
491 region.region_id, res
492 );
493 }
494 });
495 }
496
497 fn update_region_version(
498 version_control: &VersionControlRef,
499 new_meta: RegionMetadataRef,
500 new_options: Option<RegionOptions>,
501 memtable_builder_provider: &MemtableBuilderProvider,
502 ) {
503 let options_changed = new_options.is_some();
504 let region_id = new_meta.region_id;
505 if let Some(new_options) = new_options {
506 let new_memtable_builder = memtable_builder_provider.builder_for_options(&new_options);
509 version_control.alter_schema_and_format(new_meta, new_options, new_memtable_builder);
510 } else {
511 version_control.alter_schema(new_meta);
513 }
514
515 let version_data = version_control.current();
516 let version = version_data.version;
517 info!(
518 "Region {} is altered, metadata is {:?}, options: {:?}, options_changed: {}",
519 region_id, version.metadata, version.options, options_changed,
520 );
521 }
522}
523
524async fn edit_region(
526 region: &MitoRegionRef,
527 edit: RegionEdit,
528 cache_manager: CacheManagerRef,
529 listener: WorkerListener,
530 is_staging: bool,
531) -> Result<()> {
532 let region_id = region.region_id;
533 if let Some(write_cache) = cache_manager.write_cache() {
534 for file_meta in &edit.files_to_add {
535 let write_cache = write_cache.clone();
536 let layer = region.access_layer.clone();
537 let listener = listener.clone();
538
539 let index_key = IndexKey::new(region_id, file_meta.file_id, FileType::Parquet);
540 let remote_path =
541 location::sst_file_path(layer.table_dir(), file_meta.file_id(), layer.path_type());
542
543 let is_index_exist = file_meta.exists_index();
544 let index_file_size = file_meta.index_file_size();
545
546 let index_file_index_key = IndexKey::new(
547 region_id,
548 file_meta.index_id().file_id.file_id(),
549 FileType::Puffin(file_meta.index_version),
550 );
551 let index_remote_path = location::index_file_path(
552 layer.table_dir(),
553 file_meta.index_id(),
554 layer.path_type(),
555 );
556
557 let file_size = file_meta.file_size;
558 common_runtime::spawn_global(async move {
559 WRITE_CACHE_INFLIGHT_DOWNLOAD.add(1);
560
561 let parquet_cached = write_cache
562 .download_if_absent(index_key, &remote_path, layer.object_store(), file_size)
563 .await;
564
565 if parquet_cached.is_ok() {
566 let mut cache_metrics = Default::default();
569 let _ = write_cache
570 .file_cache()
571 .get_parquet_meta_data(
572 index_key,
573 &mut cache_metrics,
574 PageIndexPolicy::Optional,
575 )
576 .await;
577
578 if matches!(parquet_cached, Ok(true)) {
579 listener.on_file_cache_filled(index_key.file_id);
580 }
581 }
582 if is_index_exist {
583 if let Err(err) = write_cache
585 .download(
586 index_file_index_key,
587 &index_remote_path,
588 layer.object_store(),
589 index_file_size,
590 )
591 .await
592 {
593 common_telemetry::error!(
594 err; "Failed to download puffin file, region_id: {}, index_file_index_key: {:?}, index_remote_path: {}", region_id, index_file_index_key, index_remote_path
595 );
596 }
597 }
598
599 WRITE_CACHE_INFLIGHT_DOWNLOAD.sub(1);
600 });
601 }
602 }
603
604 info!(
605 "Applying {edit:?} to region {}, is_staging: {}",
606 region_id, is_staging
607 );
608
609 let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit));
610 region
611 .manifest_ctx
612 .update_manifest(RegionLeaderState::Editing, action_list, is_staging)
613 .await
614 .map(|_| ())
615}