1use std::collections::{HashMap, VecDeque};
20use std::num::NonZeroU64;
21use std::sync::Arc;
22
23use common_telemetry::{info, warn};
24use store_api::logstore::LogStore;
25use store_api::storage::RegionId;
26
27use crate::cache::CacheManagerRef;
28use crate::cache::file_cache::{FileType, IndexKey};
29use crate::error::{RegionBusySnafu, RegionNotFoundSnafu, Result};
30use crate::manifest::action::{
31 RegionChange, RegionEdit, RegionMetaAction, RegionMetaActionList, RegionTruncate,
32};
33use crate::metrics::WRITE_CACHE_INFLIGHT_DOWNLOAD;
34use crate::region::version::VersionBuilder;
35use crate::region::{MitoRegionRef, RegionLeaderState, RegionRoleState};
36use crate::request::{
37 BackgroundNotify, OptionOutputTx, RegionChangeResult, RegionEditRequest, RegionEditResult,
38 RegionSyncRequest, TruncateResult, WorkerRequest, WorkerRequestWithTime,
39};
40use crate::sst::location;
41use crate::worker::{RegionWorkerLoop, WorkerListener};
42
43pub(crate) type RegionEditQueues = HashMap<RegionId, RegionEditQueue>;
44
45pub(crate) struct RegionEditQueue {
50 region_id: RegionId,
51 requests: VecDeque<RegionEditRequest>,
52}
53
54impl RegionEditQueue {
55 const QUEUE_MAX_LEN: usize = 128;
56
57 fn new(region_id: RegionId) -> Self {
58 Self {
59 region_id,
60 requests: VecDeque::new(),
61 }
62 }
63
64 fn enqueue(&mut self, request: RegionEditRequest) {
65 if self.requests.len() > Self::QUEUE_MAX_LEN {
66 let _ = request.tx.send(
67 RegionBusySnafu {
68 region_id: self.region_id,
69 }
70 .fail(),
71 );
72 return;
73 };
74 self.requests.push_back(request);
75 }
76
77 fn dequeue(&mut self) -> Option<RegionEditRequest> {
78 self.requests.pop_front()
79 }
80}
81
82impl<S: LogStore> RegionWorkerLoop<S> {
83 pub(crate) async fn handle_manifest_region_change_result(
85 &mut self,
86 change_result: RegionChangeResult,
87 ) {
88 let region = match self.regions.get_region(change_result.region_id) {
89 Some(region) => region,
90 None => {
91 self.reject_region_stalled_requests(&change_result.region_id);
92 change_result.sender.send(
93 RegionNotFoundSnafu {
94 region_id: change_result.region_id,
95 }
96 .fail(),
97 );
98 return;
99 }
100 };
101
102 if change_result.result.is_ok() {
103 region
105 .version_control
106 .alter_schema(change_result.new_meta, ®ion.memtable_builder);
107
108 let version = region.version();
109 info!(
110 "Region {} is altered, metadata is {:?}, options: {:?}",
111 region.region_id, version.metadata, version.options,
112 );
113 }
114
115 region.switch_state_to_writable(RegionLeaderState::Altering);
117 change_result.sender.send(change_result.result.map(|_| 0));
119
120 self.handle_region_stalled_requests(&change_result.region_id)
122 .await;
123 }
124
125 pub(crate) async fn handle_region_sync(&mut self, request: RegionSyncRequest) {
130 let region_id = request.region_id;
131 let sender = request.sender;
132 let region = match self.regions.follower_region(region_id) {
133 Ok(region) => region,
134 Err(e) => {
135 let _ = sender.send(Err(e));
136 return;
137 }
138 };
139
140 let original_manifest_version = region.manifest_ctx.manifest_version().await;
141 let manifest = match region
142 .manifest_ctx
143 .install_manifest_to(request.manifest_version)
144 .await
145 {
146 Ok(manifest) => manifest,
147 Err(e) => {
148 let _ = sender.send(Err(e));
149 return;
150 }
151 };
152 let version = region.version();
153 if !version.memtables.is_empty() {
154 let current = region.version_control.current();
155 warn!(
156 "Region {} memtables is not empty, which should not happen, manifest version: {}, last entry id: {}",
157 region.region_id, manifest.manifest_version, current.last_entry_id
158 );
159 }
160 let region_options = version.options.clone();
161 let new_mutable = Arc::new(
162 region
163 .version()
164 .memtables
165 .mutable
166 .new_with_part_duration(version.compaction_time_window),
167 );
168 let metadata = manifest.metadata.clone();
169 let version = VersionBuilder::new(metadata, new_mutable)
170 .add_files(region.file_purger.clone(), manifest.files.values().cloned())
171 .flushed_entry_id(manifest.flushed_entry_id)
172 .flushed_sequence(manifest.flushed_sequence)
173 .truncated_entry_id(manifest.truncated_entry_id)
174 .compaction_time_window(manifest.compaction_time_window)
175 .options(region_options)
176 .build();
177 region.version_control.overwrite_current(Arc::new(version));
178
179 let updated = manifest.manifest_version > original_manifest_version;
180 let _ = sender.send(Ok((manifest.manifest_version, updated)));
181 }
182}
183
184impl<S> RegionWorkerLoop<S> {
185 pub(crate) async fn handle_region_edit(&mut self, request: RegionEditRequest) {
187 let region_id = request.region_id;
188 let Some(region) = self.regions.get_region(region_id) else {
189 let _ = request.tx.send(RegionNotFoundSnafu { region_id }.fail());
190 return;
191 };
192
193 if !region.is_writable() {
194 if region.state() == RegionRoleState::Leader(RegionLeaderState::Editing) {
195 self.region_edit_queues
196 .entry(region_id)
197 .or_insert_with(|| RegionEditQueue::new(region_id))
198 .enqueue(request);
199 } else {
200 let _ = request.tx.send(RegionBusySnafu { region_id }.fail());
201 }
202 return;
203 }
204
205 let RegionEditRequest {
206 region_id: _,
207 mut edit,
208 tx: sender,
209 } = request;
210 let file_sequence = region.version_control.committed_sequence() + 1;
211 edit.committed_sequence = Some(file_sequence);
212
213 for file in &mut edit.files_to_add {
215 file.sequence = NonZeroU64::new(file_sequence);
216 }
217
218 if let Err(e) = region.set_editing() {
220 let _ = sender.send(Err(e));
221 return;
222 }
223
224 let request_sender = self.sender.clone();
225 let cache_manager = self.cache_manager.clone();
226 let listener = self.listener.clone();
227 common_runtime::spawn_global(async move {
230 let result = edit_region(®ion, edit.clone(), cache_manager, listener).await;
231 let notify = WorkerRequest::Background {
232 region_id,
233 notify: BackgroundNotify::RegionEdit(RegionEditResult {
234 region_id,
235 sender,
236 edit,
237 result,
238 }),
239 };
240
241 if let Err(res) = request_sender
243 .send(WorkerRequestWithTime::new(notify))
244 .await
245 {
246 warn!(
247 "Failed to send region edit result back to the worker, region_id: {}, res: {:?}",
248 region_id, res
249 );
250 }
251 });
252 }
253
254 pub(crate) async fn handle_region_edit_result(&mut self, edit_result: RegionEditResult) {
256 let region = match self.regions.get_region(edit_result.region_id) {
257 Some(region) => region,
258 None => {
259 let _ = edit_result.sender.send(
260 RegionNotFoundSnafu {
261 region_id: edit_result.region_id,
262 }
263 .fail(),
264 );
265 return;
266 }
267 };
268
269 let need_compaction =
270 edit_result.result.is_ok() && !edit_result.edit.files_to_add.is_empty();
271
272 if edit_result.result.is_ok() {
273 region.version_control.apply_edit(
275 Some(edit_result.edit),
276 &[],
277 region.file_purger.clone(),
278 );
279 }
280
281 region.switch_state_to_writable(RegionLeaderState::Editing);
283
284 let _ = edit_result.sender.send(edit_result.result);
285
286 if let Some(edit_queue) = self.region_edit_queues.get_mut(&edit_result.region_id)
287 && let Some(request) = edit_queue.dequeue()
288 {
289 self.handle_region_edit(request).await;
290 }
291
292 if need_compaction {
293 self.schedule_compaction(®ion).await;
294 }
295 }
296
297 pub(crate) fn handle_manifest_truncate_action(
299 &self,
300 region: MitoRegionRef,
301 truncate: RegionTruncate,
302 sender: OptionOutputTx,
303 ) {
304 if let Err(e) = region.set_truncating() {
307 sender.send(Err(e));
308 return;
309 }
310 let request_sender = self.sender.clone();
313 let manifest_ctx = region.manifest_ctx.clone();
314
315 common_runtime::spawn_global(async move {
317 let action_list =
319 RegionMetaActionList::with_action(RegionMetaAction::Truncate(truncate.clone()));
320
321 let result = manifest_ctx
322 .update_manifest(RegionLeaderState::Truncating, action_list)
323 .await
324 .map(|_| ());
325
326 let truncate_result = TruncateResult {
328 region_id: truncate.region_id,
329 sender,
330 result,
331 kind: truncate.kind,
332 };
333 let _ = request_sender
334 .send(WorkerRequestWithTime::new(WorkerRequest::Background {
335 region_id: truncate.region_id,
336 notify: BackgroundNotify::Truncate(truncate_result),
337 }))
338 .await
339 .inspect_err(|_| warn!("failed to send truncate result"));
340 });
341 }
342
343 pub(crate) fn handle_manifest_region_change(
345 &self,
346 region: MitoRegionRef,
347 change: RegionChange,
348 sender: OptionOutputTx,
349 ) {
350 if let Err(e) = region.set_altering() {
352 sender.send(Err(e));
353 return;
354 }
355 let listener = self.listener.clone();
356 let request_sender = self.sender.clone();
357 common_runtime::spawn_global(async move {
359 let new_meta = change.metadata.clone();
360 let action_list = RegionMetaActionList::with_action(RegionMetaAction::Change(change));
361
362 let result = region
363 .manifest_ctx
364 .update_manifest(RegionLeaderState::Altering, action_list)
365 .await
366 .map(|_| ());
367 let notify = WorkerRequest::Background {
368 region_id: region.region_id,
369 notify: BackgroundNotify::RegionChange(RegionChangeResult {
370 region_id: region.region_id,
371 sender,
372 result,
373 new_meta,
374 }),
375 };
376 listener
377 .on_notify_region_change_result_begin(region.region_id)
378 .await;
379
380 if let Err(res) = request_sender
381 .send(WorkerRequestWithTime::new(notify))
382 .await
383 {
384 warn!(
385 "Failed to send region change result back to the worker, region_id: {}, res: {:?}",
386 region.region_id, res
387 );
388 }
389 });
390 }
391}
392
393async fn edit_region(
395 region: &MitoRegionRef,
396 edit: RegionEdit,
397 cache_manager: CacheManagerRef,
398 listener: WorkerListener,
399) -> Result<()> {
400 let region_id = region.region_id;
401 if let Some(write_cache) = cache_manager.write_cache() {
402 for file_meta in &edit.files_to_add {
403 let write_cache = write_cache.clone();
404 let layer = region.access_layer.clone();
405 let listener = listener.clone();
406
407 let index_key = IndexKey::new(region_id, file_meta.file_id, FileType::Parquet);
408 let remote_path =
409 location::sst_file_path(layer.table_dir(), file_meta.file_id(), layer.path_type());
410
411 let is_index_exist = file_meta.exists_index();
412 let index_file_size = file_meta.index_file_size();
413
414 let index_file_index_key =
415 IndexKey::new(region_id, file_meta.file_id, FileType::Puffin);
416 let index_remote_path = location::index_file_path(
417 layer.table_dir(),
418 file_meta.file_id(),
419 layer.path_type(),
420 );
421
422 let file_size = file_meta.file_size;
423 common_runtime::spawn_global(async move {
424 WRITE_CACHE_INFLIGHT_DOWNLOAD.add(1);
425
426 if write_cache
427 .download(index_key, &remote_path, layer.object_store(), file_size)
428 .await
429 .is_ok()
430 {
431 let _ = write_cache
434 .file_cache()
435 .get_parquet_meta_data(index_key)
436 .await;
437
438 listener.on_file_cache_filled(index_key.file_id);
439 }
440 if is_index_exist {
441 if let Err(err) = write_cache
443 .download(
444 index_file_index_key,
445 &index_remote_path,
446 layer.object_store(),
447 index_file_size,
448 )
449 .await
450 {
451 common_telemetry::error!(
452 err; "Failed to download puffin file, region_id: {}, index_file_index_key: {:?}, index_remote_path: {}", region_id, index_file_index_key, index_remote_path
453 );
454 }
455 }
456
457 WRITE_CACHE_INFLIGHT_DOWNLOAD.sub(1);
458 });
459 }
460 }
461
462 info!("Applying {edit:?} to region {}", region_id);
463
464 let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit));
465 region
466 .manifest_ctx
467 .update_manifest(RegionLeaderState::Editing, action_list)
468 .await
469 .map(|_| ())
470}