1use std::any::Any;
16use std::collections::{BTreeSet, HashMap, HashSet};
17use std::sync::Arc;
18use std::time::Duration;
19
20use api::v1::meta::MailboxMessage;
21use common_meta::instruction::{self, GcRegions, GetFileRefs, GetFileRefsReply, InstructionReply};
22use common_meta::key::TableMetadataManagerRef;
23use common_meta::key::table_repart::TableRepartValue;
24use common_meta::key::table_route::PhysicalTableRouteValue;
25use common_meta::lock_key::{RegionLock, TableLock};
26use common_meta::peer::Peer;
27use common_procedure::error::ToJsonSnafu;
28use common_procedure::{
29 Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
30 Result as ProcedureResult, Status,
31};
32use common_telemetry::{debug, error, info, warn};
33use itertools::Itertools as _;
34use serde::{Deserialize, Serialize};
35use snafu::ResultExt as _;
36use store_api::storage::{FileRefsManifest, GcReport, RegionId};
37use table::metadata::TableId;
38
39use crate::error::{self, KvBackendSnafu, Result, SerializeToJsonSnafu, TableMetadataManagerSnafu};
40use crate::gc::util::table_route_to_region;
41use crate::gc::{Peer2Regions, Region2Peers};
42use crate::handler::HeartbeatMailbox;
43use crate::metrics::{METRIC_META_GC_DATANODE_CALLS_TOTAL, METRIC_META_GC_FAILED_REGIONS_TOTAL};
44use crate::service::mailbox::{Channel, MailboxRef};
45
46async fn send_get_file_refs(
48 mailbox: &MailboxRef,
49 server_addr: &str,
50 peer: &Peer,
51 instruction: GetFileRefs,
52 timeout: Duration,
53) -> Result<GetFileRefsReply> {
54 let result = send_get_file_refs_inner(mailbox, server_addr, peer, instruction, timeout).await;
55
56 match &result {
57 Ok(_) => METRIC_META_GC_DATANODE_CALLS_TOTAL
58 .with_label_values(&["get_file_refs", "success"])
59 .inc(),
60 Err(_) => METRIC_META_GC_DATANODE_CALLS_TOTAL
61 .with_label_values(&["get_file_refs", "error"])
62 .inc(),
63 }
64
65 result
66}
67
68async fn send_get_file_refs_inner(
69 mailbox: &MailboxRef,
70 server_addr: &str,
71 peer: &Peer,
72 instruction: GetFileRefs,
73 timeout: Duration,
74) -> Result<GetFileRefsReply> {
75 let instruction = instruction::Instruction::GetFileRefs(instruction);
76 let msg = MailboxMessage::json_message(
77 &format!("Get file references: {}", instruction),
78 &format!("Metasrv@{}", server_addr),
79 &format!("Datanode-{}@{}", peer.id, peer.addr),
80 common_time::util::current_time_millis(),
81 &instruction,
82 )
83 .with_context(|_| SerializeToJsonSnafu {
84 input: instruction.to_string(),
85 })?;
86
87 let mailbox_rx = mailbox
88 .send(&Channel::Datanode(peer.id), msg, timeout)
89 .await?;
90
91 let reply = match mailbox_rx.await {
92 Ok(reply_msg) => HeartbeatMailbox::json_reply(&reply_msg)?,
93 Err(e) => {
94 error!(
95 e; "Failed to receive reply from datanode {} for GetFileRefs instruction",
96 peer,
97 );
98 return Err(e);
99 }
100 };
101
102 let InstructionReply::GetFileRefs(reply) = reply else {
103 return error::UnexpectedInstructionReplySnafu {
104 mailbox_message: format!("{:?}", reply),
105 reason: "Unexpected reply of the GetFileRefs instruction",
106 }
107 .fail();
108 };
109
110 Ok(reply)
111}
112
113async fn send_gc_regions(
115 mailbox: &MailboxRef,
116 peer: &Peer,
117 gc_regions: GcRegions,
118 server_addr: &str,
119 timeout: Duration,
120 description: &str,
121) -> Result<GcReport> {
122 let failed_region_count = gc_regions.regions.len() as u64;
123 let result =
124 send_gc_regions_inner(mailbox, peer, gc_regions, server_addr, timeout, description).await;
125
126 match result {
127 Ok(report) => {
128 METRIC_META_GC_DATANODE_CALLS_TOTAL
129 .with_label_values(&["gc_regions", "success"])
130 .inc();
131
132 let need_retry_count = report.need_retry_regions.len() as u64;
133 if need_retry_count > 0 {
134 METRIC_META_GC_FAILED_REGIONS_TOTAL.inc_by(need_retry_count);
135 }
136
137 Ok(report)
138 }
139 Err(e) => {
140 METRIC_META_GC_DATANODE_CALLS_TOTAL
141 .with_label_values(&["gc_regions", "error"])
142 .inc();
143 if failed_region_count > 0 {
144 METRIC_META_GC_FAILED_REGIONS_TOTAL.inc_by(failed_region_count);
145 }
146 Err(e)
147 }
148 }
149}
150
151async fn send_gc_regions_inner(
152 mailbox: &MailboxRef,
153 peer: &Peer,
154 gc_regions: GcRegions,
155 server_addr: &str,
156 timeout: Duration,
157 description: &str,
158) -> Result<GcReport> {
159 let instruction = instruction::Instruction::GcRegions(gc_regions.clone());
160 let msg = MailboxMessage::json_message(
161 &format!("{}: {}", description, instruction),
162 &format!("Metasrv@{}", server_addr),
163 &format!("Datanode-{}@{}", peer.id, peer.addr),
164 common_time::util::current_time_millis(),
165 &instruction,
166 )
167 .with_context(|_| SerializeToJsonSnafu {
168 input: instruction.to_string(),
169 })?;
170
171 let mailbox_rx = mailbox
172 .send(&Channel::Datanode(peer.id), msg, timeout)
173 .await?;
174
175 let reply = match mailbox_rx.await {
176 Ok(reply_msg) => HeartbeatMailbox::json_reply(&reply_msg)?,
177 Err(e) => {
178 error!(
179 e; "Failed to receive reply from datanode {} for {}",
180 peer, description
181 );
182 return Err(e);
183 }
184 };
185
186 let InstructionReply::GcRegions(reply) = reply else {
187 return error::UnexpectedInstructionReplySnafu {
188 mailbox_message: format!("{:?}", reply),
189 reason: "Unexpected reply of the GcRegions instruction",
190 }
191 .fail();
192 };
193
194 let res = reply.result;
195 match res {
196 Ok(report) => Ok(report),
197 Err(e) => {
198 error!(
199 e; "Datanode {} reported error during GC for regions {:?}",
200 peer, gc_regions
201 );
202 error::UnexpectedSnafu {
203 violated: format!(
204 "Datanode {} reported error during GC for regions {:?}: {}",
205 peer, gc_regions, e
206 ),
207 }
208 .fail()
209 }
210 }
211}
212
213pub struct BatchGcProcedure {
216 mailbox: MailboxRef,
217 table_metadata_manager: TableMetadataManagerRef,
218 data: BatchGcData,
219}
220
221#[derive(Serialize, Deserialize)]
222pub struct BatchGcData {
223 state: State,
224 server_addr: String,
226 regions: Vec<RegionId>,
228 full_file_listing: bool,
229 region_routes: Region2Peers,
230 #[serde(default)]
232 region_routes_override: Region2Peers,
233 related_regions: HashMap<RegionId, HashSet<RegionId>>,
236 file_refs: FileRefsManifest,
238 timeout: Duration,
240 gc_report: Option<GcReport>,
241}
242
243#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
244pub enum State {
245 Start,
247 Acquiring,
249 Gcing,
251 UpdateRepartition,
253}
254
255impl BatchGcProcedure {
256 pub const TYPE_NAME: &'static str = "metasrv-procedure::BatchGcProcedure";
257
258 pub fn new(
259 mailbox: MailboxRef,
260 table_metadata_manager: TableMetadataManagerRef,
261 server_addr: String,
262 regions: Vec<RegionId>,
263 full_file_listing: bool,
264 timeout: Duration,
265 region_routes_override: Region2Peers,
266 ) -> Self {
267 Self {
268 mailbox,
269 table_metadata_manager,
270 data: BatchGcData {
271 state: State::Start,
272 server_addr,
273 regions,
274 full_file_listing,
275 timeout,
276 region_routes: HashMap::new(),
277 region_routes_override,
278 related_regions: HashMap::new(),
279 file_refs: FileRefsManifest::default(),
280 gc_report: None,
281 },
282 }
283 }
284
285 #[cfg(feature = "mock")]
289 pub fn new_update_repartition_for_test(
290 mailbox: MailboxRef,
291 table_metadata_manager: TableMetadataManagerRef,
292 server_addr: String,
293 regions: Vec<RegionId>,
294 file_refs: FileRefsManifest,
295 timeout: Duration,
296 ) -> Self {
297 Self {
298 mailbox,
299 table_metadata_manager,
300 data: BatchGcData {
301 state: State::UpdateRepartition,
302 server_addr,
303 regions,
304 full_file_listing: false,
305 timeout,
306 region_routes: HashMap::new(),
307 region_routes_override: HashMap::new(),
308 related_regions: HashMap::new(),
309 file_refs,
310 gc_report: Some(GcReport::default()),
311 },
312 }
313 }
314
315 pub fn cast_result(res: Arc<dyn Any>) -> Result<GcReport> {
316 res.downcast_ref::<GcReport>().cloned().ok_or_else(|| {
317 error::UnexpectedSnafu {
318 violated: format!(
319 "Failed to downcast procedure result to GcReport, got {:?}",
320 std::any::type_name_of_val(&res.as_ref())
321 ),
322 }
323 .build()
324 })
325 }
326
327 async fn get_table_route(
328 &self,
329 table_id: TableId,
330 ) -> Result<(TableId, PhysicalTableRouteValue)> {
331 self.table_metadata_manager
332 .table_route_manager()
333 .get_physical_table_route(table_id)
334 .await
335 .context(TableMetadataManagerSnafu)
336 }
337
338 async fn find_related_regions(
342 &self,
343 regions: &[RegionId],
344 ) -> Result<HashMap<RegionId, HashSet<RegionId>>> {
345 let table_ids: HashSet<TableId> = regions.iter().map(|r| r.table_id()).collect();
346 let table_ids = table_ids.into_iter().collect::<Vec<_>>();
347
348 let table_routes = self
349 .table_metadata_manager
350 .table_route_manager()
351 .batch_get_physical_table_routes(&table_ids)
352 .await
353 .context(TableMetadataManagerSnafu)?;
354
355 if table_routes.len() != table_ids.len() {
356 for table_id in &table_ids {
358 if !table_routes.contains_key(table_id) {
359 return error::InvalidArgumentsSnafu {
361 err_msg: format!(
362 "Unexpected logical table route: table {} resolved to physical table regions",
363 table_id
364 ),
365 }
366 .fail();
367 }
368 }
369 }
370
371 let mut table_all_regions: HashMap<TableId, HashSet<RegionId>> = HashMap::new();
372 for (table_id, table_route) in table_routes {
373 let all_regions: HashSet<RegionId> = table_route
374 .region_routes
375 .iter()
376 .map(|r| r.region.id)
377 .collect();
378
379 table_all_regions.insert(table_id, all_regions);
380 }
381
382 let mut related_regions: HashMap<RegionId, HashSet<RegionId>> = HashMap::new();
383 for region_id in regions {
384 let table_id = region_id.table_id();
385 if let Some(all_regions) = table_all_regions.get(&table_id) {
386 let mut related: HashSet<RegionId> = all_regions.clone();
387 related.remove(region_id);
388 related_regions.insert(*region_id, related);
389 } else {
390 related_regions.insert(*region_id, Default::default());
391 }
392 }
393
394 Ok(related_regions)
395 }
396
397 async fn cleanup_region_repartition(&self, procedure_ctx: &ProcedureContext) -> Result<()> {
400 let mut cross_refs_grouped: HashMap<TableId, HashMap<RegionId, HashSet<RegionId>>> =
401 HashMap::new();
402 for (src_region, dst_regions) in &self.data.file_refs.cross_region_refs {
403 cross_refs_grouped
404 .entry(src_region.table_id())
405 .or_default()
406 .entry(*src_region)
407 .or_default()
408 .extend(dst_regions.iter().copied());
409 }
410
411 let mut tmp_refs_grouped: HashMap<TableId, HashSet<RegionId>> = HashMap::new();
412 for (src_region, refs) in &self.data.file_refs.file_refs {
413 if refs.is_empty() {
414 continue;
415 }
416
417 tmp_refs_grouped
418 .entry(src_region.table_id())
419 .or_default()
420 .insert(*src_region);
421 }
422
423 let repart_mgr = self.table_metadata_manager.table_repart_manager();
424
425 let mut table_ids: HashSet<TableId> = cross_refs_grouped
426 .keys()
427 .copied()
428 .chain(tmp_refs_grouped.keys().copied())
429 .collect();
430 table_ids.extend(self.data.regions.iter().map(|r| r.table_id()));
431
432 for table_id in table_ids {
433 let table_lock = TableLock::Write(table_id).into();
434 let _guard = procedure_ctx.provider.acquire_lock(&table_lock).await;
435
436 let cross_refs = cross_refs_grouped
437 .get(&table_id)
438 .cloned()
439 .unwrap_or_default();
440 let tmp_refs = tmp_refs_grouped.get(&table_id).cloned().unwrap_or_default();
441
442 let current = repart_mgr
443 .get_with_raw_bytes(table_id)
444 .await
445 .context(KvBackendSnafu)?;
446
447 let mut new_value = current
448 .as_ref()
449 .map(|v| (**v).clone())
450 .unwrap_or_else(TableRepartValue::new);
451
452 let batch_src_regions: HashSet<RegionId> = self
455 .data
456 .regions
457 .iter()
458 .copied()
459 .filter(|r| r.table_id() == table_id)
460 .collect();
461
462 let all_src_regions: HashSet<RegionId> = batch_src_regions;
466
467 for src_region in all_src_regions {
468 let cross_dst = cross_refs.get(&src_region);
469 let has_tmp_ref = tmp_refs.contains(&src_region);
470
471 if let Some(dst_regions) = cross_dst {
472 let mut set = BTreeSet::new();
473 set.extend(dst_regions.iter().copied());
474 new_value.src_to_dst.insert(src_region, set);
475 } else if has_tmp_ref {
476 new_value.src_to_dst.insert(src_region, BTreeSet::new());
479 } else {
480 new_value.src_to_dst.remove(&src_region);
481 }
482 }
483
484 if new_value.src_to_dst.is_empty() && current.is_none() {
486 continue;
487 }
488
489 repart_mgr
490 .upsert_value(table_id, current, &new_value)
491 .await
492 .context(KvBackendSnafu)?;
493 }
494
495 Ok(())
496 }
497
498 async fn discover_route_for_regions(
500 &self,
501 regions: &[RegionId],
502 ) -> Result<(Region2Peers, Peer2Regions)> {
503 let mut region_to_peer = HashMap::new();
504 let mut peer_to_regions = HashMap::new();
505
506 let mut table_to_regions: HashMap<TableId, Vec<RegionId>> = HashMap::new();
508 for region_id in regions {
509 let table_id = region_id.table_id();
510 table_to_regions
511 .entry(table_id)
512 .or_default()
513 .push(*region_id);
514 }
515
516 for (table_id, table_regions) in table_to_regions {
518 match self.get_table_route(table_id).await {
519 Ok((_phy_table_id, table_route)) => {
520 table_route_to_region(
521 &table_route,
522 &table_regions,
523 &mut region_to_peer,
524 &mut peer_to_regions,
525 );
526 }
527 Err(e) => {
528 warn!(
531 "Failed to get table route for table {}: {}, skipping its regions",
532 table_id, e
533 );
534 continue;
535 }
536 }
537 }
538
539 Ok((region_to_peer, peer_to_regions))
540 }
541
542 async fn set_routes_and_related_regions(&mut self) -> Result<()> {
544 let related_regions = self.find_related_regions(&self.data.regions).await?;
545
546 self.data.related_regions = related_regions.clone();
547
548 let mut regions_set: HashSet<RegionId> = self.data.regions.iter().cloned().collect();
551
552 regions_set.extend(related_regions.keys().cloned());
553 regions_set.extend(related_regions.values().flat_map(|v| v.iter()).cloned());
554
555 let regions_to_discover = regions_set.into_iter().collect_vec();
556
557 let (mut region_to_peer, _) = self
558 .discover_route_for_regions(®ions_to_discover)
559 .await?;
560
561 for (region_id, route) in &self.data.region_routes_override {
562 region_to_peer
563 .entry(*region_id)
564 .or_insert_with(|| route.clone());
565 }
566
567 self.data.region_routes = region_to_peer;
568
569 Ok(())
570 }
571
572 async fn get_file_references(&mut self) -> Result<FileRefsManifest> {
574 self.set_routes_and_related_regions().await?;
575
576 let query_regions = &self.data.regions;
577 let related_regions = &self.data.related_regions;
578 let region_routes = &self.data.region_routes;
579 let timeout = self.data.timeout;
580 let dropped_regions = self
581 .data
582 .region_routes_override
583 .keys()
584 .collect::<HashSet<_>>();
585
586 let mut datanode2query_regions: HashMap<Peer, Vec<RegionId>> = HashMap::new();
588
589 for region_id in query_regions {
590 if dropped_regions.contains(region_id) {
591 continue;
592 }
593 if let Some((leader, followers)) = region_routes.get(region_id) {
594 datanode2query_regions
595 .entry(leader.clone())
596 .or_default()
597 .push(*region_id);
598 for follower in followers {
600 datanode2query_regions
601 .entry(follower.clone())
602 .or_default()
603 .push(*region_id);
604 }
605 } else {
606 return error::UnexpectedSnafu {
607 violated: format!(
608 "region_routes: {region_routes:?} does not contain region_id: {region_id}",
609 ),
610 }
611 .fail();
612 }
613 }
614
615 let mut datanode2related_regions: HashMap<Peer, HashMap<RegionId, HashSet<RegionId>>> =
616 HashMap::new();
617 for (src_region, dst_regions) in related_regions {
618 for dst_region in dst_regions {
619 if let Some((leader, _followers)) = region_routes.get(dst_region) {
620 datanode2related_regions
621 .entry(leader.clone())
622 .or_default()
623 .entry(*src_region)
624 .or_default()
625 .insert(*dst_region);
626 } }
628 }
629
630 let mut all_file_refs: HashMap<RegionId, HashSet<_>> = HashMap::new();
632 let mut all_manifest_versions = HashMap::new();
633 let mut all_cross_region_refs = HashMap::new();
634
635 let mut peers = HashSet::new();
636 peers.extend(datanode2query_regions.keys().cloned());
637 peers.extend(datanode2related_regions.keys().cloned());
638
639 for peer in peers {
640 let regions = datanode2query_regions.remove(&peer).unwrap_or_default();
641 let related_regions_for_peer =
642 datanode2related_regions.remove(&peer).unwrap_or_default();
643
644 if regions.is_empty() && related_regions_for_peer.is_empty() {
645 continue;
646 }
647
648 let instruction = GetFileRefs {
649 query_regions: regions.clone(),
650 related_regions: related_regions_for_peer.clone(),
651 };
652
653 let reply = send_get_file_refs(
654 &self.mailbox,
655 &self.data.server_addr,
656 &peer,
657 instruction,
658 timeout,
659 )
660 .await?;
661 debug!(
662 "Got file references from datanode: {:?}, query_regions: {:?}, related_regions: {:?}, reply: {:?}",
663 peer, regions, related_regions_for_peer, reply
664 );
665
666 if !reply.success {
667 return error::UnexpectedSnafu {
668 violated: format!(
669 "Failed to get file references from datanode {}: {:?}",
670 peer, reply.error
671 ),
672 }
673 .fail();
674 }
675
676 for (region_id, file_refs) in reply.file_refs_manifest.file_refs {
678 all_file_refs
679 .entry(region_id)
680 .or_default()
681 .extend(file_refs);
682 }
683
684 for (region_id, version) in reply.file_refs_manifest.manifest_version {
686 let entry = all_manifest_versions.entry(region_id).or_insert(version);
687 *entry = (*entry).min(version);
688 }
689
690 for (region_id, related_region_ids) in reply.file_refs_manifest.cross_region_refs {
691 let entry = all_cross_region_refs
692 .entry(region_id)
693 .or_insert_with(HashSet::new);
694 entry.extend(related_region_ids);
695 }
696 }
697
698 Ok(FileRefsManifest {
699 file_refs: all_file_refs,
700 manifest_version: all_manifest_versions,
701 cross_region_refs: all_cross_region_refs,
702 })
703 }
704
705 async fn send_gc_instructions(&self) -> Result<GcReport> {
708 let regions = &self.data.regions;
709 let region_routes = &self.data.region_routes;
710 let file_refs = &self.data.file_refs;
711 let timeout = self.data.timeout;
712
713 let mut datanode2regions: HashMap<Peer, Vec<RegionId>> = HashMap::new();
715 let mut all_report = GcReport::default();
716
717 for region_id in regions {
718 if let Some((leader, _followers)) = region_routes.get(region_id) {
719 datanode2regions
720 .entry(leader.clone())
721 .or_default()
722 .push(*region_id);
723 } else {
724 return error::UnexpectedSnafu {
725 violated: format!(
726 "region_routes: {region_routes:?} does not contain region_id: {region_id}",
727 ),
728 }
729 .fail();
730 }
731 }
732
733 let mut all_need_retry = HashSet::new();
734 for (peer, regions_for_peer) in datanode2regions {
736 let gc_regions = GcRegions {
737 regions: regions_for_peer.clone(),
738 file_refs_manifest: file_refs.clone(),
740 full_file_listing: self.data.full_file_listing,
741 };
742
743 let report = send_gc_regions(
744 &self.mailbox,
745 &peer,
746 gc_regions,
747 self.data.server_addr.as_str(),
748 timeout,
749 "Batch GC",
750 )
751 .await?;
752
753 let success = report.deleted_files.keys().collect_vec();
754 let need_retry = report.need_retry_regions.iter().cloned().collect_vec();
755
756 if need_retry.is_empty() {
757 info!(
758 "GC report from datanode {}: successfully deleted files for regions {:?}",
759 peer, success
760 );
761 } else {
762 warn!(
763 "GC report from datanode {}: successfully deleted files for regions {:?}, need retry for regions {:?}",
764 peer, success, need_retry
765 );
766 }
767 all_need_retry.extend(report.need_retry_regions.clone());
768 all_report.merge(report);
769 }
770
771 if !all_need_retry.is_empty() {
772 warn!("Regions need retry after batch GC: {:?}", all_need_retry);
773 }
774
775 Ok(all_report)
776 }
777}
778
779#[async_trait::async_trait]
780impl Procedure for BatchGcProcedure {
781 fn type_name(&self) -> &str {
782 Self::TYPE_NAME
783 }
784
785 async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult<Status> {
786 match self.data.state {
787 State::Start => {
788 self.data.state = State::Acquiring;
790 Ok(Status::executing(false))
791 }
792 State::Acquiring => {
793 match self.get_file_references().await {
795 Ok(file_refs) => {
796 self.data.file_refs = file_refs;
797 self.data.state = State::Gcing;
798 Ok(Status::executing(false))
799 }
800 Err(e) => {
801 error!(e; "Failed to get file references");
802 Err(ProcedureError::external(e))
803 }
804 }
805 }
806 State::Gcing => {
807 match self.send_gc_instructions().await {
810 Ok(report) => {
811 self.data.state = State::UpdateRepartition;
812 self.data.gc_report = Some(report);
813 Ok(Status::executing(false))
814 }
815 Err(e) => {
816 error!(e; "Failed to send GC instructions");
817 Err(ProcedureError::external(e))
818 }
819 }
820 }
821 State::UpdateRepartition => match self.cleanup_region_repartition(ctx).await {
822 Ok(()) => {
823 info!(
824 "Cleanup region repartition info completed successfully for regions {:?}",
825 self.data.regions
826 );
827 info!(
828 "Batch GC completed successfully for regions {:?}",
829 self.data.regions
830 );
831 let Some(report) = self.data.gc_report.take() else {
832 return common_procedure::error::UnexpectedSnafu {
833 err_msg: "GC report should be present after GC completion".to_string(),
834 }
835 .fail();
836 };
837 info!("GC report: {:?}", report);
838 Ok(Status::done_with_output(report))
839 }
840 Err(e) => {
841 error!(e; "Failed to cleanup region repartition info");
842 Err(ProcedureError::external(e))
843 }
844 },
845 }
846 }
847
848 fn dump(&self) -> ProcedureResult<String> {
849 serde_json::to_string(&self.data).context(ToJsonSnafu)
850 }
851
852 fn lock_key(&self) -> LockKey {
855 let lock_key: Vec<_> = self
856 .data
857 .regions
858 .iter()
859 .sorted() .map(|id| RegionLock::Read(*id).into())
861 .collect();
862
863 LockKey::new(lock_key)
864 }
865}