1use std::collections::{HashMap, VecDeque};
20use std::sync::Arc;
21
22use common_telemetry::{info, warn};
23use store_api::logstore::LogStore;
24use store_api::storage::RegionId;
25
26use crate::cache::file_cache::{FileType, IndexKey};
27use crate::cache::CacheManagerRef;
28use crate::error::{RegionBusySnafu, RegionNotFoundSnafu, Result};
29use crate::manifest::action::{
30 RegionChange, RegionEdit, RegionMetaAction, RegionMetaActionList, RegionTruncate,
31};
32use crate::metrics::WRITE_CACHE_INFLIGHT_DOWNLOAD;
33use crate::region::version::VersionBuilder;
34use crate::region::{MitoRegionRef, RegionLeaderState, RegionRoleState};
35use crate::request::{
36 BackgroundNotify, OptionOutputTx, RegionChangeResult, RegionEditRequest, RegionEditResult,
37 RegionSyncRequest, TruncateResult, WorkerRequest, WorkerRequestWithTime,
38};
39use crate::sst::location;
40use crate::worker::{RegionWorkerLoop, WorkerListener};
41
42pub(crate) type RegionEditQueues = HashMap<RegionId, RegionEditQueue>;
43
44pub(crate) struct RegionEditQueue {
49 region_id: RegionId,
50 requests: VecDeque<RegionEditRequest>,
51}
52
53impl RegionEditQueue {
54 const QUEUE_MAX_LEN: usize = 128;
55
56 fn new(region_id: RegionId) -> Self {
57 Self {
58 region_id,
59 requests: VecDeque::new(),
60 }
61 }
62
63 fn enqueue(&mut self, request: RegionEditRequest) {
64 if self.requests.len() > Self::QUEUE_MAX_LEN {
65 let _ = request.tx.send(
66 RegionBusySnafu {
67 region_id: self.region_id,
68 }
69 .fail(),
70 );
71 return;
72 };
73 self.requests.push_back(request);
74 }
75
76 fn dequeue(&mut self) -> Option<RegionEditRequest> {
77 self.requests.pop_front()
78 }
79}
80
81impl<S: LogStore> RegionWorkerLoop<S> {
82 pub(crate) async fn handle_manifest_region_change_result(
84 &mut self,
85 change_result: RegionChangeResult,
86 ) {
87 let region = match self.regions.get_region(change_result.region_id) {
88 Some(region) => region,
89 None => {
90 self.reject_region_stalled_requests(&change_result.region_id);
91 change_result.sender.send(
92 RegionNotFoundSnafu {
93 region_id: change_result.region_id,
94 }
95 .fail(),
96 );
97 return;
98 }
99 };
100
101 if change_result.result.is_ok() {
102 region
104 .version_control
105 .alter_schema(change_result.new_meta, ®ion.memtable_builder);
106
107 let version = region.version();
108 info!(
109 "Region {} is altered, metadata is {:?}, options: {:?}",
110 region.region_id, version.metadata, version.options,
111 );
112 }
113
114 region.switch_state_to_writable(RegionLeaderState::Altering);
116 change_result.sender.send(change_result.result.map(|_| 0));
118
119 self.handle_region_stalled_requests(&change_result.region_id)
121 .await;
122 }
123
124 pub(crate) async fn handle_region_sync(&mut self, request: RegionSyncRequest) {
129 let region_id = request.region_id;
130 let sender = request.sender;
131 let region = match self.regions.follower_region(region_id) {
132 Ok(region) => region,
133 Err(e) => {
134 let _ = sender.send(Err(e));
135 return;
136 }
137 };
138
139 let original_manifest_version = region.manifest_ctx.manifest_version().await;
140 let manifest = match region
141 .manifest_ctx
142 .install_manifest_to(request.manifest_version)
143 .await
144 {
145 Ok(manifest) => manifest,
146 Err(e) => {
147 let _ = sender.send(Err(e));
148 return;
149 }
150 };
151 let version = region.version();
152 if !version.memtables.is_empty() {
153 let current = region.version_control.current();
154 warn!(
155 "Region {} memtables is not empty, which should not happen, manifest version: {}, last entry id: {}",
156 region.region_id, manifest.manifest_version, current.last_entry_id
157 );
158 }
159 let region_options = version.options.clone();
160 let new_mutable = Arc::new(
161 region
162 .version()
163 .memtables
164 .mutable
165 .new_with_part_duration(version.compaction_time_window),
166 );
167 let metadata = manifest.metadata.clone();
168 let version = VersionBuilder::new(metadata, new_mutable)
169 .add_files(region.file_purger.clone(), manifest.files.values().cloned())
170 .flushed_entry_id(manifest.flushed_entry_id)
171 .flushed_sequence(manifest.flushed_sequence)
172 .truncated_entry_id(manifest.truncated_entry_id)
173 .compaction_time_window(manifest.compaction_time_window)
174 .options(region_options)
175 .build();
176 region.version_control.overwrite_current(Arc::new(version));
177
178 let updated = manifest.manifest_version > original_manifest_version;
179 let _ = sender.send(Ok((manifest.manifest_version, updated)));
180 }
181}
182
183impl<S> RegionWorkerLoop<S> {
184 pub(crate) async fn handle_region_edit(&mut self, request: RegionEditRequest) {
186 let region_id = request.region_id;
187 let Some(region) = self.regions.get_region(region_id) else {
188 let _ = request.tx.send(RegionNotFoundSnafu { region_id }.fail());
189 return;
190 };
191
192 if !region.is_writable() {
193 if region.state() == RegionRoleState::Leader(RegionLeaderState::Editing) {
194 self.region_edit_queues
195 .entry(region_id)
196 .or_insert_with(|| RegionEditQueue::new(region_id))
197 .enqueue(request);
198 } else {
199 let _ = request.tx.send(RegionBusySnafu { region_id }.fail());
200 }
201 return;
202 }
203
204 let RegionEditRequest {
205 region_id: _,
206 edit,
207 tx: sender,
208 } = request;
209
210 if let Err(e) = region.set_editing() {
212 let _ = sender.send(Err(e));
213 return;
214 }
215
216 let request_sender = self.sender.clone();
217 let cache_manager = self.cache_manager.clone();
218 let listener = self.listener.clone();
219 common_runtime::spawn_global(async move {
222 let result = edit_region(®ion, edit.clone(), cache_manager, listener).await;
223 let notify = WorkerRequest::Background {
224 region_id,
225 notify: BackgroundNotify::RegionEdit(RegionEditResult {
226 region_id,
227 sender,
228 edit,
229 result,
230 }),
231 };
232 if let Err(res) = request_sender
234 .send(WorkerRequestWithTime::new(notify))
235 .await
236 {
237 warn!(
238 "Failed to send region edit result back to the worker, region_id: {}, res: {:?}",
239 region_id, res
240 );
241 }
242 });
243 }
244
245 pub(crate) async fn handle_region_edit_result(&mut self, edit_result: RegionEditResult) {
247 let region = match self.regions.get_region(edit_result.region_id) {
248 Some(region) => region,
249 None => {
250 let _ = edit_result.sender.send(
251 RegionNotFoundSnafu {
252 region_id: edit_result.region_id,
253 }
254 .fail(),
255 );
256 return;
257 }
258 };
259
260 let need_compaction =
261 edit_result.result.is_ok() && !edit_result.edit.files_to_add.is_empty();
262
263 if edit_result.result.is_ok() {
264 region
266 .version_control
267 .apply_edit(edit_result.edit, &[], region.file_purger.clone());
268 }
269
270 region.switch_state_to_writable(RegionLeaderState::Editing);
272
273 let _ = edit_result.sender.send(edit_result.result);
274
275 if let Some(edit_queue) = self.region_edit_queues.get_mut(&edit_result.region_id) {
276 if let Some(request) = edit_queue.dequeue() {
277 self.handle_region_edit(request).await;
278 }
279 }
280
281 if need_compaction {
282 self.schedule_compaction(®ion).await;
283 }
284 }
285
286 pub(crate) fn handle_manifest_truncate_action(
288 &self,
289 region: MitoRegionRef,
290 truncate: RegionTruncate,
291 sender: OptionOutputTx,
292 ) {
293 if let Err(e) = region.set_truncating() {
296 sender.send(Err(e));
297 return;
298 }
299 let request_sender = self.sender.clone();
302 let manifest_ctx = region.manifest_ctx.clone();
303
304 common_runtime::spawn_global(async move {
306 let action_list =
308 RegionMetaActionList::with_action(RegionMetaAction::Truncate(truncate.clone()));
309
310 let result = manifest_ctx
311 .update_manifest(RegionLeaderState::Truncating, action_list)
312 .await
313 .map(|_| ());
314
315 let truncate_result = TruncateResult {
317 region_id: truncate.region_id,
318 sender,
319 result,
320 truncated_entry_id: truncate.truncated_entry_id,
321 truncated_sequence: truncate.truncated_sequence,
322 };
323 let _ = request_sender
324 .send(WorkerRequestWithTime::new(WorkerRequest::Background {
325 region_id: truncate.region_id,
326 notify: BackgroundNotify::Truncate(truncate_result),
327 }))
328 .await
329 .inspect_err(|_| warn!("failed to send truncate result"));
330 });
331 }
332
333 pub(crate) fn handle_manifest_region_change(
335 &self,
336 region: MitoRegionRef,
337 change: RegionChange,
338 sender: OptionOutputTx,
339 ) {
340 if let Err(e) = region.set_altering() {
342 sender.send(Err(e));
343 return;
344 }
345 let listener = self.listener.clone();
346 let request_sender = self.sender.clone();
347 common_runtime::spawn_global(async move {
349 let new_meta = change.metadata.clone();
350 let action_list = RegionMetaActionList::with_action(RegionMetaAction::Change(change));
351
352 let result = region
353 .manifest_ctx
354 .update_manifest(RegionLeaderState::Altering, action_list)
355 .await
356 .map(|_| ());
357 let notify = WorkerRequest::Background {
358 region_id: region.region_id,
359 notify: BackgroundNotify::RegionChange(RegionChangeResult {
360 region_id: region.region_id,
361 sender,
362 result,
363 new_meta,
364 }),
365 };
366 listener
367 .on_notify_region_change_result_begin(region.region_id)
368 .await;
369
370 if let Err(res) = request_sender
371 .send(WorkerRequestWithTime::new(notify))
372 .await
373 {
374 warn!(
375 "Failed to send region change result back to the worker, region_id: {}, res: {:?}",
376 region.region_id, res
377 );
378 }
379 });
380 }
381}
382
383async fn edit_region(
385 region: &MitoRegionRef,
386 edit: RegionEdit,
387 cache_manager: CacheManagerRef,
388 listener: WorkerListener,
389) -> Result<()> {
390 let region_id = region.region_id;
391 if let Some(write_cache) = cache_manager.write_cache() {
392 for file_meta in &edit.files_to_add {
393 let write_cache = write_cache.clone();
394 let layer = region.access_layer.clone();
395 let listener = listener.clone();
396
397 let index_key = IndexKey::new(region_id, file_meta.file_id, FileType::Parquet);
398 let remote_path =
399 location::sst_file_path(layer.table_dir(), file_meta.file_id(), layer.path_type());
400
401 let is_index_exist = file_meta.exists_index();
402 let index_file_size = file_meta.index_file_size();
403
404 let index_file_index_key =
405 IndexKey::new(region_id, file_meta.file_id, FileType::Puffin);
406 let index_remote_path = location::index_file_path(
407 layer.table_dir(),
408 file_meta.file_id(),
409 layer.path_type(),
410 );
411
412 let file_size = file_meta.file_size;
413 common_runtime::spawn_global(async move {
414 WRITE_CACHE_INFLIGHT_DOWNLOAD.add(1);
415
416 if write_cache
417 .download(index_key, &remote_path, layer.object_store(), file_size)
418 .await
419 .is_ok()
420 {
421 let _ = write_cache
424 .file_cache()
425 .get_parquet_meta_data(index_key)
426 .await;
427
428 listener.on_file_cache_filled(index_key.file_id);
429 }
430 if is_index_exist {
431 if let Err(err) = write_cache
433 .download(
434 index_file_index_key,
435 &index_remote_path,
436 layer.object_store(),
437 index_file_size,
438 )
439 .await
440 {
441 common_telemetry::error!(
442 err; "Failed to download puffin file, region_id: {}, index_file_index_key: {:?}, index_remote_path: {}", region_id, index_file_index_key, index_remote_path
443 );
444 }
445 }
446
447 WRITE_CACHE_INFLIGHT_DOWNLOAD.sub(1);
448 });
449 }
450 }
451
452 info!("Applying {edit:?} to region {}", region_id);
453
454 let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit));
455 region
456 .manifest_ctx
457 .update_manifest(RegionLeaderState::Editing, action_list)
458 .await
459 .map(|_| ())
460}