1use std::collections::{HashMap, VecDeque};
20use std::num::NonZeroU64;
21use std::sync::Arc;
22
23use common_telemetry::{info, warn};
24use store_api::logstore::LogStore;
25use store_api::metadata::RegionMetadataRef;
26use store_api::storage::RegionId;
27
28use crate::cache::CacheManagerRef;
29use crate::cache::file_cache::{FileType, IndexKey};
30use crate::config::IndexBuildMode;
31use crate::error::{RegionBusySnafu, RegionNotFoundSnafu, Result};
32use crate::manifest::action::{
33 RegionChange, RegionEdit, RegionMetaAction, RegionMetaActionList, RegionTruncate,
34};
35use crate::memtable::MemtableBuilderProvider;
36use crate::metrics::WRITE_CACHE_INFLIGHT_DOWNLOAD;
37use crate::region::opener::{sanitize_region_options, version_builder_from_manifest};
38use crate::region::options::RegionOptions;
39use crate::region::version::VersionControlRef;
40use crate::region::{MitoRegionRef, RegionLeaderState, RegionRoleState};
41use crate::request::{
42 BackgroundNotify, BuildIndexRequest, OptionOutputTx, RegionChangeResult, RegionEditRequest,
43 RegionEditResult, RegionSyncRequest, TruncateResult, WorkerRequest, WorkerRequestWithTime,
44};
45use crate::sst::index::IndexBuildType;
46use crate::sst::location;
47use crate::worker::{RegionWorkerLoop, WorkerListener};
48
49pub(crate) type RegionEditQueues = HashMap<RegionId, RegionEditQueue>;
50
51pub(crate) struct RegionEditQueue {
56 region_id: RegionId,
57 requests: VecDeque<RegionEditRequest>,
58}
59
60impl RegionEditQueue {
61 const QUEUE_MAX_LEN: usize = 128;
62
63 fn new(region_id: RegionId) -> Self {
64 Self {
65 region_id,
66 requests: VecDeque::new(),
67 }
68 }
69
70 fn enqueue(&mut self, request: RegionEditRequest) {
71 if self.requests.len() > Self::QUEUE_MAX_LEN {
72 let _ = request.tx.send(
73 RegionBusySnafu {
74 region_id: self.region_id,
75 }
76 .fail(),
77 );
78 return;
79 };
80 self.requests.push_back(request);
81 }
82
83 fn dequeue(&mut self) -> Option<RegionEditRequest> {
84 self.requests.pop_front()
85 }
86}
87
88impl<S: LogStore> RegionWorkerLoop<S> {
89 pub(crate) async fn handle_manifest_region_change_result(
91 &mut self,
92 change_result: RegionChangeResult,
93 ) {
94 let region = match self.regions.get_region(change_result.region_id) {
95 Some(region) => region,
96 None => {
97 self.reject_region_stalled_requests(&change_result.region_id);
98 change_result.sender.send(
99 RegionNotFoundSnafu {
100 region_id: change_result.region_id,
101 }
102 .fail(),
103 );
104 return;
105 }
106 };
107
108 if change_result.result.is_ok() {
109 Self::update_region_version(
111 ®ion.version_control,
112 change_result.new_meta,
113 change_result.new_options,
114 &self.memtable_builder_provider,
115 );
116 }
117
118 region.switch_state_to_writable(RegionLeaderState::Altering);
120 change_result.sender.send(change_result.result.map(|_| 0));
122
123 if self.config.index.build_mode == IndexBuildMode::Async && change_result.need_index {
125 self.handle_rebuild_index(
126 BuildIndexRequest {
127 region_id: region.region_id,
128 build_type: IndexBuildType::SchemaChange,
129 file_metas: Vec::new(),
130 },
131 OptionOutputTx::new(None),
132 )
133 .await;
134 }
135 self.handle_region_stalled_requests(&change_result.region_id)
137 .await;
138 }
139
140 pub(crate) async fn handle_region_sync(&mut self, request: RegionSyncRequest) {
145 let region_id = request.region_id;
146 let sender = request.sender;
147 let region = match self.regions.follower_region(region_id) {
148 Ok(region) => region,
149 Err(e) => {
150 let _ = sender.send(Err(e));
151 return;
152 }
153 };
154
155 let original_manifest_version = region.manifest_ctx.manifest_version().await;
156 let manifest = match region
157 .manifest_ctx
158 .install_manifest_to(request.manifest_version)
159 .await
160 {
161 Ok(manifest) => manifest,
162 Err(e) => {
163 let _ = sender.send(Err(e));
164 return;
165 }
166 };
167 let version = region.version();
168 let mut region_options = version.options.clone();
169 let old_format = region_options.sst_format.unwrap_or_default();
170 sanitize_region_options(&manifest, &mut region_options);
172 if !version.memtables.is_empty() {
173 let current = region.version_control.current();
174 warn!(
175 "Region {} memtables is not empty, which should not happen, manifest version: {}, last entry id: {}",
176 region.region_id, manifest.manifest_version, current.last_entry_id
177 );
178 }
179
180 let memtable_builder = if old_format != region_options.sst_format.unwrap_or_default() {
182 Some(
184 self.memtable_builder_provider
185 .builder_for_options(®ion_options),
186 )
187 } else {
188 None
189 };
190 let new_mutable = Arc::new(
191 region
192 .version()
193 .memtables
194 .mutable
195 .new_with_part_duration(version.compaction_time_window, memtable_builder),
196 );
197 let metadata = manifest.metadata.clone();
199
200 let version_builder = version_builder_from_manifest(
201 &manifest,
202 metadata,
203 region.file_purger.clone(),
204 new_mutable,
205 region_options,
206 );
207 let version = version_builder.build();
208 region.version_control.overwrite_current(Arc::new(version));
209
210 let updated = manifest.manifest_version > original_manifest_version;
211 let _ = sender.send(Ok((manifest.manifest_version, updated)));
212 }
213}
214
215impl<S> RegionWorkerLoop<S> {
216 pub(crate) fn handle_region_edit(&mut self, request: RegionEditRequest) {
218 let region_id = request.region_id;
219 let Some(region) = self.regions.get_region(region_id) else {
220 let _ = request.tx.send(RegionNotFoundSnafu { region_id }.fail());
221 return;
222 };
223
224 if !region.is_writable() {
225 if region.state() == RegionRoleState::Leader(RegionLeaderState::Editing) {
226 self.region_edit_queues
227 .entry(region_id)
228 .or_insert_with(|| RegionEditQueue::new(region_id))
229 .enqueue(request);
230 } else {
231 let _ = request.tx.send(RegionBusySnafu { region_id }.fail());
232 }
233 return;
234 }
235
236 let RegionEditRequest {
237 region_id: _,
238 mut edit,
239 tx: sender,
240 } = request;
241 let file_sequence = region.version_control.committed_sequence() + 1;
242 edit.committed_sequence = Some(file_sequence);
243
244 for file in &mut edit.files_to_add {
246 file.sequence = NonZeroU64::new(file_sequence);
247 }
248
249 let is_staging = region.is_staging();
251 let expect_state = if is_staging {
252 RegionLeaderState::Staging
253 } else {
254 RegionLeaderState::Writable
255 };
256 if let Err(e) = region.set_editing(expect_state) {
258 let _ = sender.send(Err(e));
259 return;
260 }
261
262 let request_sender = self.sender.clone();
263 let cache_manager = self.cache_manager.clone();
264 let listener = self.listener.clone();
265 common_runtime::spawn_global(async move {
268 let result =
269 edit_region(®ion, edit.clone(), cache_manager, listener, is_staging).await;
270 let notify = WorkerRequest::Background {
271 region_id,
272 notify: BackgroundNotify::RegionEdit(RegionEditResult {
273 region_id,
274 sender,
275 edit,
276 result,
277 update_region_state: true,
279 is_staging,
280 }),
281 };
282
283 if let Err(res) = request_sender
285 .send(WorkerRequestWithTime::new(notify))
286 .await
287 {
288 warn!(
289 "Failed to send region edit result back to the worker, region_id: {}, res: {:?}",
290 region_id, res
291 );
292 }
293 });
294 }
295
296 pub(crate) async fn handle_region_edit_result(&mut self, edit_result: RegionEditResult) {
298 let region = match self.regions.get_region(edit_result.region_id) {
299 Some(region) => region,
300 None => {
301 let _ = edit_result.sender.send(
302 RegionNotFoundSnafu {
303 region_id: edit_result.region_id,
304 }
305 .fail(),
306 );
307 return;
308 }
309 };
310
311 let need_compaction = if edit_result.is_staging {
312 if edit_result.update_region_state {
313 region.switch_state_to_staging(RegionLeaderState::Editing);
316 }
317
318 false
319 } else {
320 let need_compaction =
321 edit_result.result.is_ok() && !edit_result.edit.files_to_add.is_empty();
322 if edit_result.result.is_ok() {
324 region.version_control.apply_edit(
326 Some(edit_result.edit),
327 &[],
328 region.file_purger.clone(),
329 );
330 }
331 if edit_result.update_region_state {
332 region.switch_state_to_writable(RegionLeaderState::Editing);
333 }
334
335 need_compaction
336 };
337
338 let _ = edit_result.sender.send(edit_result.result);
339
340 if let Some(edit_queue) = self.region_edit_queues.get_mut(&edit_result.region_id)
341 && let Some(request) = edit_queue.dequeue()
342 {
343 self.handle_region_edit(request);
344 }
345
346 if need_compaction {
347 self.schedule_compaction(®ion).await;
348 }
349 }
350
351 pub(crate) fn handle_manifest_truncate_action(
353 &self,
354 region: MitoRegionRef,
355 truncate: RegionTruncate,
356 sender: OptionOutputTx,
357 ) {
358 if let Err(e) = region.set_truncating() {
361 sender.send(Err(e));
362 return;
363 }
364 let request_sender = self.sender.clone();
367 let manifest_ctx = region.manifest_ctx.clone();
368 let is_staging = region.is_staging();
369
370 common_runtime::spawn_global(async move {
372 let action_list =
374 RegionMetaActionList::with_action(RegionMetaAction::Truncate(truncate.clone()));
375
376 let result = manifest_ctx
377 .update_manifest(RegionLeaderState::Truncating, action_list, is_staging)
378 .await
379 .map(|_| ());
380
381 let truncate_result = TruncateResult {
383 region_id: truncate.region_id,
384 sender,
385 result,
386 kind: truncate.kind,
387 };
388 let _ = request_sender
389 .send(WorkerRequestWithTime::new(WorkerRequest::Background {
390 region_id: truncate.region_id,
391 notify: BackgroundNotify::Truncate(truncate_result),
392 }))
393 .await
394 .inspect_err(|_| warn!("failed to send truncate result"));
395 });
396 }
397
398 pub(crate) fn handle_manifest_region_change(
400 &self,
401 region: MitoRegionRef,
402 change: RegionChange,
403 need_index: bool,
404 new_options: Option<RegionOptions>,
405 sender: OptionOutputTx,
406 ) {
407 if let Err(e) = region.set_altering() {
409 sender.send(Err(e));
410 return;
411 }
412 let listener = self.listener.clone();
413 let request_sender = self.sender.clone();
414 let is_staging = region.is_staging();
415 common_runtime::spawn_global(async move {
417 let new_meta = change.metadata.clone();
418 let action_list = RegionMetaActionList::with_action(RegionMetaAction::Change(change));
419
420 let result = region
421 .manifest_ctx
422 .update_manifest(RegionLeaderState::Altering, action_list, is_staging)
423 .await
424 .map(|_| ());
425 let notify = WorkerRequest::Background {
426 region_id: region.region_id,
427 notify: BackgroundNotify::RegionChange(RegionChangeResult {
428 region_id: region.region_id,
429 sender,
430 result,
431 new_meta,
432 need_index,
433 new_options,
434 }),
435 };
436 listener
437 .on_notify_region_change_result_begin(region.region_id)
438 .await;
439
440 if let Err(res) = request_sender
441 .send(WorkerRequestWithTime::new(notify))
442 .await
443 {
444 warn!(
445 "Failed to send region change result back to the worker, region_id: {}, res: {:?}",
446 region.region_id, res
447 );
448 }
449 });
450 }
451
452 fn update_region_version(
453 version_control: &VersionControlRef,
454 new_meta: RegionMetadataRef,
455 new_options: Option<RegionOptions>,
456 memtable_builder_provider: &MemtableBuilderProvider,
457 ) {
458 let options_changed = new_options.is_some();
459 let region_id = new_meta.region_id;
460 if let Some(new_options) = new_options {
461 let new_memtable_builder = memtable_builder_provider.builder_for_options(&new_options);
464 version_control.alter_schema_and_format(new_meta, new_options, new_memtable_builder);
465 } else {
466 version_control.alter_schema(new_meta);
468 }
469
470 let version_data = version_control.current();
471 let version = version_data.version;
472 info!(
473 "Region {} is altered, metadata is {:?}, options: {:?}, options_changed: {}",
474 region_id, version.metadata, version.options, options_changed,
475 );
476 }
477}
478
479async fn edit_region(
481 region: &MitoRegionRef,
482 edit: RegionEdit,
483 cache_manager: CacheManagerRef,
484 listener: WorkerListener,
485 is_staging: bool,
486) -> Result<()> {
487 let region_id = region.region_id;
488 if let Some(write_cache) = cache_manager.write_cache() {
489 for file_meta in &edit.files_to_add {
490 let write_cache = write_cache.clone();
491 let layer = region.access_layer.clone();
492 let listener = listener.clone();
493
494 let index_key = IndexKey::new(region_id, file_meta.file_id, FileType::Parquet);
495 let remote_path =
496 location::sst_file_path(layer.table_dir(), file_meta.file_id(), layer.path_type());
497
498 let is_index_exist = file_meta.exists_index();
499 let index_file_size = file_meta.index_file_size();
500
501 let index_file_index_key = IndexKey::new(
502 region_id,
503 file_meta.index_id().file_id.file_id(),
504 FileType::Puffin(file_meta.index_version),
505 );
506 let index_remote_path = location::index_file_path(
507 layer.table_dir(),
508 file_meta.index_id(),
509 layer.path_type(),
510 );
511
512 let file_size = file_meta.file_size;
513 common_runtime::spawn_global(async move {
514 WRITE_CACHE_INFLIGHT_DOWNLOAD.add(1);
515
516 if write_cache
517 .download(index_key, &remote_path, layer.object_store(), file_size)
518 .await
519 .is_ok()
520 {
521 let _ = write_cache
524 .file_cache()
525 .get_parquet_meta_data(index_key)
526 .await;
527
528 listener.on_file_cache_filled(index_key.file_id);
529 }
530 if is_index_exist {
531 if let Err(err) = write_cache
533 .download(
534 index_file_index_key,
535 &index_remote_path,
536 layer.object_store(),
537 index_file_size,
538 )
539 .await
540 {
541 common_telemetry::error!(
542 err; "Failed to download puffin file, region_id: {}, index_file_index_key: {:?}, index_remote_path: {}", region_id, index_file_index_key, index_remote_path
543 );
544 }
545 }
546
547 WRITE_CACHE_INFLIGHT_DOWNLOAD.sub(1);
548 });
549 }
550 }
551
552 info!(
553 "Applying {edit:?} to region {}, is_staging: {}",
554 region_id, is_staging
555 );
556
557 let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit));
558 region
559 .manifest_ctx
560 .update_manifest(RegionLeaderState::Editing, action_list, is_staging)
561 .await
562 .map(|_| ())
563}