1use std::collections::{HashMap, VecDeque};
20use std::num::NonZeroU64;
21use std::sync::Arc;
22
23use common_telemetry::{info, warn};
24use parquet::file::metadata::PageIndexPolicy;
25use store_api::logstore::LogStore;
26use store_api::metadata::RegionMetadataRef;
27use store_api::storage::RegionId;
28
29use crate::cache::CacheManagerRef;
30use crate::cache::file_cache::{FileType, IndexKey};
31use crate::config::IndexBuildMode;
32use crate::error::{RegionBusySnafu, RegionNotFoundSnafu, Result};
33use crate::manifest::action::{
34 RegionChange, RegionEdit, RegionMetaAction, RegionMetaActionList, RegionTruncate,
35};
36use crate::memtable::MemtableBuilderProvider;
37use crate::metrics::WRITE_CACHE_INFLIGHT_DOWNLOAD;
38use crate::region::opener::{sanitize_region_options, version_builder_from_manifest};
39use crate::region::options::RegionOptions;
40use crate::region::version::VersionControlRef;
41use crate::region::{MitoRegionRef, RegionLeaderState, RegionRoleState};
42use crate::request::{
43 BackgroundNotify, BuildIndexRequest, OptionOutputTx, RegionChangeResult, RegionEditRequest,
44 RegionEditResult, RegionSyncRequest, TruncateResult, WorkerRequest, WorkerRequestWithTime,
45};
46use crate::sst::index::IndexBuildType;
47use crate::sst::location;
48use crate::worker::{RegionWorkerLoop, WorkerListener};
49
50pub(crate) type RegionEditQueues = HashMap<RegionId, RegionEditQueue>;
51
52pub(crate) struct RegionEditQueue {
57 region_id: RegionId,
58 requests: VecDeque<RegionEditRequest>,
59}
60
61impl RegionEditQueue {
62 const QUEUE_MAX_LEN: usize = 128;
63
64 fn new(region_id: RegionId) -> Self {
65 Self {
66 region_id,
67 requests: VecDeque::new(),
68 }
69 }
70
71 fn enqueue(&mut self, request: RegionEditRequest) {
72 if self.requests.len() > Self::QUEUE_MAX_LEN {
73 let _ = request.tx.send(
74 RegionBusySnafu {
75 region_id: self.region_id,
76 }
77 .fail(),
78 );
79 return;
80 };
81 self.requests.push_back(request);
82 }
83
84 fn dequeue(&mut self) -> Option<RegionEditRequest> {
85 self.requests.pop_front()
86 }
87}
88
89impl<S: LogStore> RegionWorkerLoop<S> {
90 pub(crate) async fn handle_manifest_region_change_result(
92 &mut self,
93 change_result: RegionChangeResult,
94 ) {
95 let region = match self.regions.get_region(change_result.region_id) {
96 Some(region) => region,
97 None => {
98 self.reject_region_stalled_requests(&change_result.region_id);
99 change_result.sender.send(
100 RegionNotFoundSnafu {
101 region_id: change_result.region_id,
102 }
103 .fail(),
104 );
105 return;
106 }
107 };
108
109 if change_result.result.is_ok() {
110 Self::update_region_version(
112 ®ion.version_control,
113 change_result.new_meta,
114 change_result.new_options,
115 &self.memtable_builder_provider,
116 );
117 }
118
119 region.switch_state_to_writable(RegionLeaderState::Altering);
121 change_result.sender.send(change_result.result.map(|_| 0));
123
124 if self.config.index.build_mode == IndexBuildMode::Async && change_result.need_index {
126 self.handle_rebuild_index(
127 BuildIndexRequest {
128 region_id: region.region_id,
129 build_type: IndexBuildType::SchemaChange,
130 file_metas: Vec::new(),
131 },
132 OptionOutputTx::new(None),
133 )
134 .await;
135 }
136 self.handle_region_stalled_requests(&change_result.region_id)
138 .await;
139 }
140
141 pub(crate) async fn handle_region_sync(&mut self, request: RegionSyncRequest) {
146 let region_id = request.region_id;
147 let sender = request.sender;
148 let region = match self.regions.follower_region(region_id) {
149 Ok(region) => region,
150 Err(e) => {
151 let _ = sender.send(Err(e));
152 return;
153 }
154 };
155
156 let original_manifest_version = region.manifest_ctx.manifest_version().await;
157 let manifest = match region
158 .manifest_ctx
159 .install_manifest_to(request.manifest_version)
160 .await
161 {
162 Ok(manifest) => manifest,
163 Err(e) => {
164 let _ = sender.send(Err(e));
165 return;
166 }
167 };
168 let version = region.version();
169 let mut region_options = version.options.clone();
170 let old_format = region_options.sst_format.unwrap_or_default();
171 sanitize_region_options(&manifest, &mut region_options);
173 if !version.memtables.is_empty() {
174 let current = region.version_control.current();
175 warn!(
176 "Region {} memtables is not empty, which should not happen, manifest version: {}, last entry id: {}",
177 region.region_id, manifest.manifest_version, current.last_entry_id
178 );
179 }
180
181 let memtable_builder = if old_format != region_options.sst_format.unwrap_or_default() {
183 Some(
185 self.memtable_builder_provider
186 .builder_for_options(®ion_options),
187 )
188 } else {
189 None
190 };
191 let new_mutable = Arc::new(
192 region
193 .version()
194 .memtables
195 .mutable
196 .new_with_part_duration(version.compaction_time_window, memtable_builder),
197 );
198 let metadata = manifest.metadata.clone();
200
201 let version_builder = version_builder_from_manifest(
202 &manifest,
203 metadata,
204 region.file_purger.clone(),
205 new_mutable,
206 region_options,
207 );
208 let version = version_builder.build();
209 region.version_control.overwrite_current(Arc::new(version));
210
211 let updated = manifest.manifest_version > original_manifest_version;
212 let _ = sender.send(Ok((manifest.manifest_version, updated)));
213 }
214}
215
216impl<S> RegionWorkerLoop<S> {
217 pub(crate) fn handle_region_edit(&mut self, request: RegionEditRequest) {
219 let region_id = request.region_id;
220 let Some(region) = self.regions.get_region(region_id) else {
221 let _ = request.tx.send(RegionNotFoundSnafu { region_id }.fail());
222 return;
223 };
224
225 if !region.is_writable() {
226 if region.state() == RegionRoleState::Leader(RegionLeaderState::Editing) {
227 self.region_edit_queues
228 .entry(region_id)
229 .or_insert_with(|| RegionEditQueue::new(region_id))
230 .enqueue(request);
231 } else {
232 let _ = request.tx.send(RegionBusySnafu { region_id }.fail());
233 }
234 return;
235 }
236
237 let RegionEditRequest {
238 region_id: _,
239 mut edit,
240 tx: sender,
241 } = request;
242 let file_sequence = region.version_control.committed_sequence() + 1;
243 edit.committed_sequence = Some(file_sequence);
244
245 for file in &mut edit.files_to_add {
247 file.sequence = NonZeroU64::new(file_sequence);
248 }
249
250 let is_staging = region.is_staging();
252 let expect_state = if is_staging {
253 RegionLeaderState::Staging
254 } else {
255 RegionLeaderState::Writable
256 };
257 if let Err(e) = region.set_editing(expect_state) {
259 let _ = sender.send(Err(e));
260 return;
261 }
262
263 let request_sender = self.sender.clone();
264 let cache_manager = self.cache_manager.clone();
265 let listener = self.listener.clone();
266 common_runtime::spawn_global(async move {
269 let result =
270 edit_region(®ion, edit.clone(), cache_manager, listener, is_staging).await;
271 let notify = WorkerRequest::Background {
272 region_id,
273 notify: BackgroundNotify::RegionEdit(RegionEditResult {
274 region_id,
275 sender,
276 edit,
277 result,
278 update_region_state: true,
280 is_staging,
281 }),
282 };
283
284 if let Err(res) = request_sender
286 .send(WorkerRequestWithTime::new(notify))
287 .await
288 {
289 warn!(
290 "Failed to send region edit result back to the worker, region_id: {}, res: {:?}",
291 region_id, res
292 );
293 }
294 });
295 }
296
297 pub(crate) async fn handle_region_edit_result(&mut self, edit_result: RegionEditResult) {
299 let region = match self.regions.get_region(edit_result.region_id) {
300 Some(region) => region,
301 None => {
302 let _ = edit_result.sender.send(
303 RegionNotFoundSnafu {
304 region_id: edit_result.region_id,
305 }
306 .fail(),
307 );
308 return;
309 }
310 };
311
312 let need_compaction = if edit_result.is_staging {
313 if edit_result.update_region_state {
314 region.switch_state_to_staging(RegionLeaderState::Editing);
317 }
318
319 false
320 } else {
321 let need_compaction =
322 edit_result.result.is_ok() && !edit_result.edit.files_to_add.is_empty();
323 if edit_result.result.is_ok() {
325 region.version_control.apply_edit(
327 Some(edit_result.edit),
328 &[],
329 region.file_purger.clone(),
330 );
331 }
332 if edit_result.update_region_state {
333 region.switch_state_to_writable(RegionLeaderState::Editing);
334 }
335
336 need_compaction
337 };
338
339 let _ = edit_result.sender.send(edit_result.result);
340
341 if let Some(edit_queue) = self.region_edit_queues.get_mut(&edit_result.region_id)
342 && let Some(request) = edit_queue.dequeue()
343 {
344 self.handle_region_edit(request);
345 }
346
347 if need_compaction {
348 self.schedule_compaction(®ion).await;
349 }
350 }
351
352 pub(crate) fn handle_manifest_truncate_action(
354 &self,
355 region: MitoRegionRef,
356 truncate: RegionTruncate,
357 sender: OptionOutputTx,
358 ) {
359 if let Err(e) = region.set_truncating() {
362 sender.send(Err(e));
363 return;
364 }
365 let request_sender = self.sender.clone();
368 let manifest_ctx = region.manifest_ctx.clone();
369 let is_staging = region.is_staging();
370
371 common_runtime::spawn_global(async move {
373 let action_list =
375 RegionMetaActionList::with_action(RegionMetaAction::Truncate(truncate.clone()));
376
377 let result = manifest_ctx
378 .update_manifest(RegionLeaderState::Truncating, action_list, is_staging)
379 .await
380 .map(|_| ());
381
382 let truncate_result = TruncateResult {
384 region_id: truncate.region_id,
385 sender,
386 result,
387 kind: truncate.kind,
388 };
389 let _ = request_sender
390 .send(WorkerRequestWithTime::new(WorkerRequest::Background {
391 region_id: truncate.region_id,
392 notify: BackgroundNotify::Truncate(truncate_result),
393 }))
394 .await
395 .inspect_err(|_| warn!("failed to send truncate result"));
396 });
397 }
398
399 pub(crate) fn handle_manifest_region_change(
401 &self,
402 region: MitoRegionRef,
403 change: RegionChange,
404 need_index: bool,
405 new_options: Option<RegionOptions>,
406 sender: OptionOutputTx,
407 ) {
408 if let Err(e) = region.set_altering() {
410 sender.send(Err(e));
411 return;
412 }
413 let listener = self.listener.clone();
414 let request_sender = self.sender.clone();
415 let is_staging = region.is_staging();
416 common_runtime::spawn_global(async move {
418 let new_meta = change.metadata.clone();
419 let action_list = RegionMetaActionList::with_action(RegionMetaAction::Change(change));
420
421 let result = region
422 .manifest_ctx
423 .update_manifest(RegionLeaderState::Altering, action_list, is_staging)
424 .await
425 .map(|_| ());
426 let notify = WorkerRequest::Background {
427 region_id: region.region_id,
428 notify: BackgroundNotify::RegionChange(RegionChangeResult {
429 region_id: region.region_id,
430 sender,
431 result,
432 new_meta,
433 need_index,
434 new_options,
435 }),
436 };
437 listener
438 .on_notify_region_change_result_begin(region.region_id)
439 .await;
440
441 if let Err(res) = request_sender
442 .send(WorkerRequestWithTime::new(notify))
443 .await
444 {
445 warn!(
446 "Failed to send region change result back to the worker, region_id: {}, res: {:?}",
447 region.region_id, res
448 );
449 }
450 });
451 }
452
453 fn update_region_version(
454 version_control: &VersionControlRef,
455 new_meta: RegionMetadataRef,
456 new_options: Option<RegionOptions>,
457 memtable_builder_provider: &MemtableBuilderProvider,
458 ) {
459 let options_changed = new_options.is_some();
460 let region_id = new_meta.region_id;
461 if let Some(new_options) = new_options {
462 let new_memtable_builder = memtable_builder_provider.builder_for_options(&new_options);
465 version_control.alter_schema_and_format(new_meta, new_options, new_memtable_builder);
466 } else {
467 version_control.alter_schema(new_meta);
469 }
470
471 let version_data = version_control.current();
472 let version = version_data.version;
473 info!(
474 "Region {} is altered, metadata is {:?}, options: {:?}, options_changed: {}",
475 region_id, version.metadata, version.options, options_changed,
476 );
477 }
478}
479
480async fn edit_region(
482 region: &MitoRegionRef,
483 edit: RegionEdit,
484 cache_manager: CacheManagerRef,
485 listener: WorkerListener,
486 is_staging: bool,
487) -> Result<()> {
488 let region_id = region.region_id;
489 if let Some(write_cache) = cache_manager.write_cache() {
490 for file_meta in &edit.files_to_add {
491 let write_cache = write_cache.clone();
492 let layer = region.access_layer.clone();
493 let listener = listener.clone();
494
495 let index_key = IndexKey::new(region_id, file_meta.file_id, FileType::Parquet);
496 let remote_path =
497 location::sst_file_path(layer.table_dir(), file_meta.file_id(), layer.path_type());
498
499 let is_index_exist = file_meta.exists_index();
500 let index_file_size = file_meta.index_file_size();
501
502 let index_file_index_key = IndexKey::new(
503 region_id,
504 file_meta.index_id().file_id.file_id(),
505 FileType::Puffin(file_meta.index_version),
506 );
507 let index_remote_path = location::index_file_path(
508 layer.table_dir(),
509 file_meta.index_id(),
510 layer.path_type(),
511 );
512
513 let file_size = file_meta.file_size;
514 common_runtime::spawn_global(async move {
515 WRITE_CACHE_INFLIGHT_DOWNLOAD.add(1);
516
517 if write_cache
518 .download(index_key, &remote_path, layer.object_store(), file_size)
519 .await
520 .is_ok()
521 {
522 let mut cache_metrics = Default::default();
525 let _ = write_cache
526 .file_cache()
527 .get_parquet_meta_data(
528 index_key,
529 &mut cache_metrics,
530 PageIndexPolicy::Optional,
531 )
532 .await;
533
534 listener.on_file_cache_filled(index_key.file_id);
535 }
536 if is_index_exist {
537 if let Err(err) = write_cache
539 .download(
540 index_file_index_key,
541 &index_remote_path,
542 layer.object_store(),
543 index_file_size,
544 )
545 .await
546 {
547 common_telemetry::error!(
548 err; "Failed to download puffin file, region_id: {}, index_file_index_key: {:?}, index_remote_path: {}", region_id, index_file_index_key, index_remote_path
549 );
550 }
551 }
552
553 WRITE_CACHE_INFLIGHT_DOWNLOAD.sub(1);
554 });
555 }
556 }
557
558 info!(
559 "Applying {edit:?} to region {}, is_staging: {}",
560 region_id, is_staging
561 );
562
563 let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit));
564 region
565 .manifest_ctx
566 .update_manifest(RegionLeaderState::Editing, action_list, is_staging)
567 .await
568 .map(|_| ())
569}