1use std::collections::{BTreeMap, HashMap, HashSet};
25use std::sync::Arc;
26use std::time::Duration;
27
28use common_meta::datanode::GcStat;
29use common_telemetry::{debug, error, info, warn};
30use common_time::Timestamp;
31use itertools::Itertools;
32use object_store::{Entry, Lister};
33use serde::{Deserialize, Serialize};
34use snafu::ResultExt as _;
35use store_api::storage::{FileId, FileRef, FileRefsManifest, GcReport, IndexVersion, RegionId};
36use tokio::sync::{OwnedSemaphorePermit, TryAcquireError};
37use tokio_stream::StreamExt;
38
39use crate::access_layer::AccessLayerRef;
40use crate::cache::CacheManagerRef;
41use crate::cache::file_cache::FileType;
42use crate::config::MitoConfig;
43use crate::error::{
44 DurationOutOfRangeSnafu, JoinSnafu, OpenDalSnafu, Result, TooManyGcJobsSnafu, UnexpectedSnafu,
45};
46use crate::manifest::action::{RegionManifest, RemovedFile};
47use crate::metrics::{GC_DELETE_FILE_CNT, GC_ORPHANED_INDEX_FILES, GC_SKIPPED_UNPARSABLE_FILES};
48use crate::region::{MitoRegionRef, RegionRoleState};
49use crate::sst::file::{RegionFileId, RegionIndexId, delete_files, delete_index};
50use crate::sst::location::{self};
51
52#[cfg(test)]
53mod worker_test;
54
55fn should_delete_file(
58 is_in_manifest: bool,
59 is_in_tmp_ref: bool,
60 is_linger: bool,
61 is_eligible_for_delete: bool,
62 entry: &Entry,
63 unknown_file_may_linger_until: chrono::DateTime<chrono::Utc>,
64) -> bool {
65 let is_known = is_linger || is_eligible_for_delete;
66
67 let is_unknown_linger_time_exceeded = || {
68 entry
72 .metadata()
73 .last_modified()
74 .map(|t| t < unknown_file_may_linger_until)
75 .unwrap_or(false)
76 };
77
78 !is_in_manifest
79 && !is_in_tmp_ref
80 && if is_known {
81 is_eligible_for_delete
82 } else {
83 is_unknown_linger_time_exceeded()
84 }
85}
86
87pub struct GcLimiter {
89 pub gc_job_limit: Arc<tokio::sync::Semaphore>,
90 gc_concurrency: usize,
91}
92
93pub type GcLimiterRef = Arc<GcLimiter>;
94
95impl GcLimiter {
96 pub fn new(gc_concurrency: usize) -> Self {
97 Self {
98 gc_job_limit: Arc::new(tokio::sync::Semaphore::new(gc_concurrency)),
99 gc_concurrency,
100 }
101 }
102
103 pub fn running_gc_tasks(&self) -> u32 {
104 (self.gc_concurrency - self.gc_job_limit.available_permits()) as u32
105 }
106
107 pub fn gc_concurrency(&self) -> u32 {
108 self.gc_concurrency as u32
109 }
110
111 pub fn gc_stat(&self) -> GcStat {
112 GcStat::new(self.running_gc_tasks(), self.gc_concurrency())
113 }
114
115 pub fn permit(&self) -> Result<OwnedSemaphorePermit> {
119 self.gc_job_limit
120 .clone()
121 .try_acquire_owned()
122 .map_err(|e| match e {
123 TryAcquireError::Closed => UnexpectedSnafu {
124 reason: format!("Failed to acquire gc permit: {e}"),
125 }
126 .build(),
127 TryAcquireError::NoPermits => TooManyGcJobsSnafu {}.build(),
128 })
129 }
130}
131
132#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
133#[serde(default)]
134pub struct GcConfig {
135 pub enable: bool,
137 #[serde(with = "humantime_serde")]
144 pub lingering_time: Option<Duration>,
145 #[serde(with = "humantime_serde")]
150 pub unknown_file_lingering_time: Duration,
151 pub max_concurrent_lister_per_gc_job: usize,
154 pub max_concurrent_gc_job: usize,
158}
159
160impl Default for GcConfig {
161 fn default() -> Self {
162 Self {
163 enable: false,
164 lingering_time: Some(Duration::from_secs(60)),
166 unknown_file_lingering_time: Duration::from_secs(60 * 60),
168 max_concurrent_lister_per_gc_job: 32,
169 max_concurrent_gc_job: 4,
170 }
171 }
172}
173
174pub struct LocalGcWorker {
175 pub(crate) access_layer: AccessLayerRef,
176 pub(crate) cache_manager: Option<CacheManagerRef>,
177 pub(crate) regions: BTreeMap<RegionId, MitoRegionRef>,
178 pub(crate) opt: GcConfig,
180 pub(crate) file_ref_manifest: FileRefsManifest,
185 _permit: OwnedSemaphorePermit,
186 pub full_file_listing: bool,
195}
196
197pub struct ManifestOpenConfig {
198 pub compress_manifest: bool,
199 pub manifest_checkpoint_distance: u64,
200 pub experimental_manifest_keep_removed_file_count: usize,
201 pub experimental_manifest_keep_removed_file_ttl: Duration,
202}
203
204impl From<MitoConfig> for ManifestOpenConfig {
205 fn from(mito_config: MitoConfig) -> Self {
206 Self {
207 compress_manifest: mito_config.compress_manifest,
208 manifest_checkpoint_distance: mito_config.manifest_checkpoint_distance,
209 experimental_manifest_keep_removed_file_count: mito_config
210 .experimental_manifest_keep_removed_file_count,
211 experimental_manifest_keep_removed_file_ttl: mito_config
212 .experimental_manifest_keep_removed_file_ttl,
213 }
214 }
215}
216
217impl LocalGcWorker {
218 #[allow(clippy::too_many_arguments)]
222 pub async fn try_new(
223 access_layer: AccessLayerRef,
224 cache_manager: Option<CacheManagerRef>,
225 regions_to_gc: BTreeMap<RegionId, MitoRegionRef>,
226 opt: GcConfig,
227 file_ref_manifest: FileRefsManifest,
228 limiter: &GcLimiterRef,
229 full_file_listing: bool,
230 ) -> Result<Self> {
231 let permit = limiter.permit()?;
232
233 Ok(Self {
234 access_layer,
235 cache_manager,
236 regions: regions_to_gc,
237 opt,
238 file_ref_manifest,
239 _permit: permit,
240 full_file_listing,
241 })
242 }
243
244 pub async fn read_tmp_ref_files(&self) -> Result<HashMap<RegionId, HashSet<FileRef>>> {
246 let mut tmp_ref_files = HashMap::new();
247 for (region_id, file_refs) in &self.file_ref_manifest.file_refs {
248 tmp_ref_files
249 .entry(*region_id)
250 .or_insert_with(HashSet::new)
251 .extend(file_refs.clone());
252 }
254
255 Ok(tmp_ref_files)
256 }
257
258 pub async fn run(self) -> Result<GcReport> {
264 info!("LocalGcWorker started");
265 let now = std::time::Instant::now();
266
267 let mut deleted_files = HashMap::new();
268 let mut deleted_indexes = HashMap::new();
269 let tmp_ref_files = self.read_tmp_ref_files().await?;
270 for (region_id, region) in &self.regions {
271 let per_region_time = std::time::Instant::now();
272 if region.manifest_ctx.current_state() == RegionRoleState::Follower {
273 return UnexpectedSnafu {
274 reason: format!(
275 "Region {} is in Follower state, should not run GC on follower regions",
276 region_id
277 ),
278 }
279 .fail();
280 }
281 let tmp_ref_files = tmp_ref_files
282 .get(region_id)
283 .cloned()
284 .unwrap_or_else(HashSet::new);
285 let files = self.do_region_gc(region.clone(), &tmp_ref_files).await?;
286 let index_files = files
287 .iter()
288 .filter_map(|f| f.index_version().map(|v| (f.file_id(), v)))
289 .collect_vec();
290 deleted_files.insert(*region_id, files.into_iter().map(|f| f.file_id()).collect());
291 deleted_indexes.insert(*region_id, index_files);
292 debug!(
293 "GC for region {} took {} secs.",
294 region_id,
295 per_region_time.elapsed().as_secs_f32()
296 );
297 }
298 info!(
299 "LocalGcWorker finished after {} secs.",
300 now.elapsed().as_secs_f32()
301 );
302 let report = GcReport {
303 deleted_files,
304 deleted_indexes,
305 need_retry_regions: HashSet::new(),
306 };
307 Ok(report)
308 }
309}
310
311impl LocalGcWorker {
312 pub const CONCURRENCY_LIST_PER_FILES: usize = 1024;
315
316 pub async fn do_region_gc(
325 &self,
326 region: MitoRegionRef,
327 tmp_ref_files: &HashSet<FileRef>,
328 ) -> Result<Vec<RemovedFile>> {
329 let region_id = region.region_id();
330
331 debug!("Doing gc for region {}", region_id);
332
333 let manifest = region.manifest_ctx.manifest().await;
334 let file_ref_manifest_version = self
336 .file_ref_manifest
337 .manifest_version
338 .get(®ion.region_id())
339 .cloned();
340 if file_ref_manifest_version != Some(manifest.manifest_version) {
341 warn!(
343 "Tmp ref files manifest version {:?} does not match region {} manifest version {}, skip gc for this region",
344 file_ref_manifest_version,
345 region.region_id(),
346 manifest.manifest_version
347 );
348 return Ok(vec![]);
349 }
350
351 let all_entries = if self.full_file_listing {
354 self.list_from_object_store(region.region_id(), manifest.clone())
355 .await?
356 } else {
357 vec![]
358 };
359
360 let region_id = manifest.metadata.region_id;
361 let current_files = &manifest.files;
362
363 let recently_removed_files = self.get_removed_files_expel_times(&manifest).await?;
364
365 if recently_removed_files.is_empty() {
366 debug!("No recently removed files to gc for region {}", region_id);
368 }
369
370 let removed_file_cnt = recently_removed_files
371 .values()
372 .map(|s| s.len())
373 .sum::<usize>();
374
375 let in_manifest = current_files
376 .iter()
377 .map(|(file_id, meta)| (*file_id, meta.index_version()))
378 .collect::<HashMap<_, _>>();
379
380 let in_tmp_ref = tmp_ref_files
381 .iter()
382 .map(|file_ref| (file_ref.file_id, file_ref.index_version))
383 .collect::<HashSet<_>>();
384
385 let deletable_files = self
386 .list_to_be_deleted_files(
387 region_id,
388 &in_manifest,
389 &in_tmp_ref,
390 recently_removed_files,
391 all_entries,
392 )
393 .await?;
394
395 let unused_file_cnt = deletable_files.len();
396
397 debug!(
398 "gc: for region {region_id}: In manifest files: {}, Tmp ref file cnt: {}, recently removed files: {}, Unused files to delete: {} ",
399 current_files.len(),
400 tmp_ref_files.len(),
401 removed_file_cnt,
402 deletable_files.len()
403 );
404
405 debug!(
406 "Found {} unused index files to delete for region {}",
407 deletable_files.len(),
408 region_id
409 );
410
411 self.delete_files(region_id, &deletable_files).await?;
412
413 debug!(
414 "Successfully deleted {} unused files for region {}",
415 unused_file_cnt, region_id
416 );
417 self.update_manifest_removed_files(®ion, deletable_files.clone())
418 .await?;
419
420 Ok(deletable_files)
421 }
422
423 async fn delete_files(&self, region_id: RegionId, removed_files: &[RemovedFile]) -> Result<()> {
424 let mut index_ids = vec![];
425 let file_pairs = removed_files
426 .iter()
427 .filter_map(|f| match f {
428 RemovedFile::File(file_id, v) => Some((*file_id, v.unwrap_or(0))),
429 RemovedFile::Index(file_id, index_version) => {
430 let region_index_id =
431 RegionIndexId::new(RegionFileId::new(region_id, *file_id), *index_version);
432 index_ids.push(region_index_id);
433 None
434 }
435 })
436 .collect_vec();
437 delete_files(
438 region_id,
439 &file_pairs,
440 true,
441 &self.access_layer,
442 &self.cache_manager,
443 )
444 .await?;
445
446 for index_id in index_ids {
447 delete_index(index_id, &self.access_layer, &self.cache_manager).await?;
448 }
449
450 GC_DELETE_FILE_CNT.add(removed_files.len() as i64);
452
453 Ok(())
454 }
455
456 async fn update_manifest_removed_files(
458 &self,
459 region: &MitoRegionRef,
460 deleted_files: Vec<RemovedFile>,
461 ) -> Result<()> {
462 let deleted_file_cnt = deleted_files.len();
463 debug!(
464 "Trying to update manifest for {deleted_file_cnt} removed files for region {}",
465 region.region_id()
466 );
467
468 let mut manager = region.manifest_ctx.manifest_manager.write().await;
469 let cnt = deleted_files.len();
470 manager.clear_deleted_files(deleted_files);
471 debug!(
472 "Updated region_id={} region manifest to clear {cnt} deleted files",
473 region.region_id(),
474 );
475
476 Ok(())
477 }
478
479 pub async fn get_removed_files_expel_times(
484 &self,
485 region_manifest: &Arc<RegionManifest>,
486 ) -> Result<BTreeMap<Timestamp, HashSet<RemovedFile>>> {
487 let mut ret = BTreeMap::new();
488 for files in ®ion_manifest.removed_files.removed_files {
489 let expel_time = Timestamp::new_millisecond(files.removed_at);
490 let set = ret.entry(expel_time).or_insert_with(HashSet::new);
491 set.extend(files.files.iter().cloned());
492 }
493
494 Ok(ret)
495 }
496
497 async fn partition_region_files(
500 &self,
501 region_id: RegionId,
502 concurrency: usize,
503 ) -> Result<Vec<(Lister, Option<String>)>> {
504 let region_dir = self.access_layer.build_region_dir(region_id);
505
506 let partitions = gen_partition_from_concurrency(concurrency);
507 let bounds = vec![None]
508 .into_iter()
509 .chain(partitions.iter().map(|p| Some(p.clone())))
510 .chain(vec![None])
511 .collect::<Vec<_>>();
512
513 let mut listers = vec![];
514 for part in bounds.windows(2) {
515 let start = part[0].clone();
516 let end = part[1].clone();
517 let mut lister = self.access_layer.object_store().lister_with(®ion_dir);
518 if let Some(s) = start {
519 lister = lister.start_after(&s);
520 }
521
522 let lister = lister.await.context(OpenDalSnafu)?;
523 listers.push((lister, end));
524 }
525
526 Ok(listers)
527 }
528
529 async fn list_from_object_store(
533 &self,
534 region_id: RegionId,
535 manifest: Arc<RegionManifest>,
536 ) -> Result<Vec<Entry>> {
537 let start = tokio::time::Instant::now();
538 let current_files = &manifest.files;
539 let concurrency = (current_files.len() / Self::CONCURRENCY_LIST_PER_FILES)
540 .max(1)
541 .min(self.opt.max_concurrent_lister_per_gc_job);
542
543 let listers = self.partition_region_files(region_id, concurrency).await?;
544 let lister_cnt = listers.len();
545
546 let all_entries = self.list_region_files_concurrent(listers).await?;
548 let cnt = all_entries.len();
549 info!(
550 "gc: full listing mode cost {} secs using {lister_cnt} lister for {cnt} files in region {}.",
551 start.elapsed().as_secs_f64(),
552 region_id
553 );
554 Ok(all_entries)
555 }
556
557 async fn list_region_files_concurrent(
560 &self,
561 listers: Vec<(Lister, Option<String>)>,
562 ) -> Result<Vec<Entry>> {
563 let (tx, mut rx) = tokio::sync::mpsc::channel(1024);
564 let mut handles = vec![];
565
566 for (lister, end) in listers {
567 let tx = tx.clone();
568 let handle = tokio::spawn(async move {
569 let stream = lister.take_while(|e: &std::result::Result<Entry, _>| match e {
570 Ok(e) => {
571 if let Some(end) = &end {
572 e.name() < end.as_str()
574 } else {
575 true
577 }
578 }
579 Err(err) => {
581 warn!("Failed to list entry: {}", err);
582 true
583 }
584 });
585 let stream = stream
586 .filter(|e| {
587 if let Ok(e) = &e {
588 e.metadata().is_file()
590 } else {
591 true
593 }
594 })
595 .collect::<Vec<_>>()
596 .await;
597 tx.send(stream).await.expect("Failed to send entries");
599 });
600
601 handles.push(handle);
602 }
603
604 for handle in handles {
606 handle.await.context(JoinSnafu)?;
607 }
608
609 drop(tx); let mut all_entries = vec![];
613 while let Some(stream) = rx.recv().await {
614 all_entries.extend(stream.into_iter().filter_map(Result::ok));
615 }
616
617 Ok(all_entries)
618 }
619
620 fn filter_deletable_files(
621 &self,
622 entries: Vec<Entry>,
623 in_manifest: &HashMap<FileId, Option<IndexVersion>>,
624 in_tmp_ref: &HashSet<(FileId, Option<IndexVersion>)>,
625 may_linger_files: &HashSet<&RemovedFile>,
626 eligible_for_delete: &HashSet<&RemovedFile>,
627 unknown_file_may_linger_until: chrono::DateTime<chrono::Utc>,
628 ) -> Vec<RemovedFile> {
629 let mut ready_for_delete = vec![];
630 let in_tmp_ref: HashMap<FileId, HashSet<IndexVersion>> =
632 in_tmp_ref
633 .iter()
634 .fold(HashMap::new(), |mut acc, (file, version)| {
635 let indices = acc.entry(*file).or_default();
636 if let Some(version) = version {
637 indices.insert(*version);
638 }
639 acc
640 });
641
642 let may_linger_files: HashMap<FileId, HashSet<&RemovedFile>> = may_linger_files
643 .iter()
644 .fold(HashMap::new(), |mut acc, file| {
645 let indices = acc.entry(file.file_id()).or_default();
646 indices.insert(file);
647 acc
648 });
649
650 let eligible_for_delete: HashMap<FileId, HashSet<&RemovedFile>> = eligible_for_delete
651 .iter()
652 .fold(HashMap::new(), |mut acc, file| {
653 let indices = acc.entry(file.file_id()).or_default();
654 indices.insert(file);
655 acc
656 });
657
658 for entry in entries {
659 let (file_id, file_type) = match location::parse_file_id_type_from_path(entry.name()) {
660 Ok((file_id, file_type)) => (file_id, file_type),
661 Err(err) => {
662 error!(err; "Failed to parse file id from path: {}", entry.name());
663 GC_SKIPPED_UNPARSABLE_FILES.inc();
666 continue;
667 }
668 };
669
670 let should_delete = match file_type {
671 FileType::Parquet => {
672 let is_in_manifest = in_manifest.contains_key(&file_id);
673 let is_in_tmp_ref = in_tmp_ref.contains_key(&file_id);
674 let is_linger = may_linger_files.contains_key(&file_id);
675 let is_eligible_for_delete = eligible_for_delete.contains_key(&file_id);
676
677 should_delete_file(
678 is_in_manifest,
679 is_in_tmp_ref,
680 is_linger,
681 is_eligible_for_delete,
682 &entry,
683 unknown_file_may_linger_until,
684 )
685 }
686 FileType::Puffin(version) => {
687 let is_in_manifest = in_manifest
689 .get(&file_id)
690 .map(|opt_ver| *opt_ver == Some(version))
691 .unwrap_or(false);
692 let is_in_tmp_ref = in_tmp_ref
693 .get(&file_id)
694 .map(|versions| versions.contains(&version))
695 .unwrap_or(false);
696 let is_linger = may_linger_files
697 .get(&file_id)
698 .map(|files| files.contains(&&RemovedFile::Index(file_id, version)))
699 .unwrap_or(false);
700 let is_eligible_for_delete = eligible_for_delete
701 .get(&file_id)
702 .map(|files| files.contains(&&RemovedFile::Index(file_id, version)))
703 .unwrap_or(false);
704
705 should_delete_file(
706 is_in_manifest,
707 is_in_tmp_ref,
708 is_linger,
709 is_eligible_for_delete,
710 &entry,
711 unknown_file_may_linger_until,
712 )
713 }
714 };
715
716 if should_delete {
717 let removed_file = match file_type {
718 FileType::Parquet => {
719 RemovedFile::File(file_id, None)
722 }
723 FileType::Puffin(version) => {
724 GC_ORPHANED_INDEX_FILES.inc();
725 RemovedFile::Index(file_id, version)
726 }
727 };
728 ready_for_delete.push(removed_file);
729 }
730 }
731 ready_for_delete
732 }
733
734 pub async fn list_to_be_deleted_files(
743 &self,
744 region_id: RegionId,
745 in_manifest: &HashMap<FileId, Option<IndexVersion>>,
746 in_tmp_ref: &HashSet<(FileId, Option<IndexVersion>)>,
747 recently_removed_files: BTreeMap<Timestamp, HashSet<RemovedFile>>,
748 all_entries: Vec<Entry>,
749 ) -> Result<Vec<RemovedFile>> {
750 let now = chrono::Utc::now();
751 let may_linger_until = self
752 .opt
753 .lingering_time
754 .map(|lingering_time| {
755 chrono::Duration::from_std(lingering_time)
756 .with_context(|_| DurationOutOfRangeSnafu {
757 input: lingering_time,
758 })
759 .map(|t| now - t)
760 })
761 .transpose()?;
762
763 let unknown_file_may_linger_until = now
764 - chrono::Duration::from_std(self.opt.unknown_file_lingering_time).with_context(
765 |_| DurationOutOfRangeSnafu {
766 input: self.opt.unknown_file_lingering_time,
767 },
768 )?;
769
770 let threshold =
772 may_linger_until.map(|until| Timestamp::new_millisecond(until.timestamp_millis()));
773 let mut recently_removed_files = recently_removed_files;
774 let may_linger_files = match threshold {
775 Some(threshold) => recently_removed_files.split_off(&threshold),
776 None => BTreeMap::new(),
777 };
778 debug!("may_linger_files: {:?}", may_linger_files);
779
780 let all_may_linger_files = may_linger_files.values().flatten().collect::<HashSet<_>>();
781
782 let eligible_for_removal = recently_removed_files
785 .values()
786 .flatten()
787 .collect::<HashSet<_>>();
788
789 if !self.full_file_listing {
792 let files_to_delete: Vec<RemovedFile> = eligible_for_removal
797 .iter()
798 .filter(|file_id| {
799 let in_use = match file_id {
800 RemovedFile::File(file_id, index_version) => {
801 in_manifest.get(file_id) == Some(index_version)
802 || in_tmp_ref.contains(&(*file_id, *index_version))
803 }
804 RemovedFile::Index(file_id, index_version) => {
805 in_manifest.get(file_id) == Some(&Some(*index_version))
806 || in_tmp_ref.contains(&(*file_id, Some(*index_version)))
807 }
808 };
809 !in_use
810 })
811 .map(|&f| f.clone())
812 .collect();
813
814 info!(
815 "gc: fast mode (no full listing) for region {}, found {} files to delete from manifest",
816 region_id,
817 files_to_delete.len()
818 );
819
820 return Ok(files_to_delete);
821 }
822
823 let all_unused_files_ready_for_delete = self.filter_deletable_files(
827 all_entries,
828 in_manifest,
829 in_tmp_ref,
830 &all_may_linger_files,
831 &eligible_for_removal,
832 unknown_file_may_linger_until,
833 );
834
835 Ok(all_unused_files_ready_for_delete)
836 }
837}
838
839fn gen_partition_from_concurrency(concurrency: usize) -> Vec<String> {
849 let n = concurrency.next_power_of_two();
850 if n <= 1 {
851 return vec![];
852 }
853
854 let mut d = 0;
858 let mut p: u128 = 1;
859 while p < n as u128 {
860 p *= 16;
861 d += 1;
862 }
863
864 let total_space = p;
865 let step = total_space / n as u128;
866
867 (1..n)
868 .map(|i| {
869 let boundary = i as u128 * step;
870 format!("{:0width$x}", boundary, width = d)
871 })
872 .collect()
873}
874
875#[cfg(test)]
876mod tests {
877 use super::*;
878
879 #[test]
880 fn test_gen_partition_from_concurrency() {
881 let partitions = gen_partition_from_concurrency(1);
882 assert!(partitions.is_empty());
883
884 let partitions = gen_partition_from_concurrency(2);
885 assert_eq!(partitions, vec!["8"]);
886
887 let partitions = gen_partition_from_concurrency(3);
888 assert_eq!(partitions, vec!["4", "8", "c"]);
889
890 let partitions = gen_partition_from_concurrency(4);
891 assert_eq!(partitions, vec!["4", "8", "c"]);
892
893 let partitions = gen_partition_from_concurrency(8);
894 assert_eq!(partitions, vec!["2", "4", "6", "8", "a", "c", "e"]);
895
896 let partitions = gen_partition_from_concurrency(16);
897 assert_eq!(
898 partitions,
899 vec![
900 "1", "2", "3", "4", "5", "6", "7", "8", "9", "a", "b", "c", "d", "e", "f"
901 ]
902 );
903
904 let partitions = gen_partition_from_concurrency(32);
905 assert_eq!(
906 partitions,
907 [
908 "08", "10", "18", "20", "28", "30", "38", "40", "48", "50", "58", "60", "68", "70",
909 "78", "80", "88", "90", "98", "a0", "a8", "b0", "b8", "c0", "c8", "d0", "d8", "e0",
910 "e8", "f0", "f8",
911 ]
912 );
913 }
914}