1use std::sync::{Arc, RwLock};
27use std::time::Duration;
28
29use common_telemetry::info;
30use store_api::metadata::RegionMetadataRef;
31use store_api::storage::SequenceNumber;
32
33use crate::error::Result;
34use crate::manifest::action::{RegionEdit, TruncateKind};
35use crate::memtable::time_partition::{TimePartitions, TimePartitionsRef};
36use crate::memtable::version::{MemtableVersion, MemtableVersionRef};
37use crate::memtable::{MemtableBuilderRef, MemtableId};
38use crate::region::options::RegionOptions;
39use crate::sst::file::FileMeta;
40use crate::sst::file_purger::FilePurgerRef;
41use crate::sst::version::{SstVersion, SstVersionRef};
42use crate::wal::EntryId;
43
44#[derive(Debug)]
49pub(crate) struct VersionControl {
50 data: RwLock<VersionControlData>,
51}
52
53impl VersionControl {
54 pub(crate) fn new(version: Version) -> VersionControl {
56 let (flushed_sequence, flushed_entry_id) =
58 (version.flushed_sequence, version.flushed_entry_id);
59 VersionControl {
60 data: RwLock::new(VersionControlData {
61 version: Arc::new(version),
62 committed_sequence: flushed_sequence,
63 last_entry_id: flushed_entry_id,
64 is_dropped: false,
65 }),
66 }
67 }
68
69 pub(crate) fn current(&self) -> VersionControlData {
71 self.data.read().unwrap().clone()
72 }
73
74 pub(crate) fn set_sequence_and_entry_id(&self, seq: SequenceNumber, entry_id: EntryId) {
76 let mut data = self.data.write().unwrap();
77 data.committed_sequence = seq;
78 data.last_entry_id = entry_id;
79 }
80
81 pub(crate) fn set_entry_id(&self, entry_id: EntryId) {
83 let mut data = self.data.write().unwrap();
84 data.last_entry_id = entry_id;
85 }
86
87 pub(crate) fn committed_sequence(&self) -> SequenceNumber {
89 self.data.read().unwrap().committed_sequence
90 }
91
92 pub(crate) fn freeze_mutable(&self) -> Result<()> {
94 let version = self.current().version;
95 let time_window = version.compaction_time_window;
96
97 let Some(new_memtables) = version
98 .memtables
99 .freeze_mutable(&version.metadata, time_window)?
100 else {
101 return Ok(());
102 };
103
104 let new_version = Arc::new(
106 VersionBuilder::from_version(version)
107 .memtables(new_memtables)
108 .build(),
109 );
110
111 let mut version_data = self.data.write().unwrap();
112 version_data.version = new_version;
113
114 Ok(())
115 }
116
117 pub(crate) fn alter_options(&self, options: RegionOptions) {
119 let version = self.current().version;
120 let new_version = Arc::new(
121 VersionBuilder::from_version(version)
122 .options(options)
123 .build(),
124 );
125 let mut version_data = self.data.write().unwrap();
126 version_data.version = new_version;
127 }
128
129 pub(crate) fn apply_edit(
131 &self,
132 edit: RegionEdit,
133 memtables_to_remove: &[MemtableId],
134 purger: FilePurgerRef,
135 ) {
136 let version = self.current().version;
137 let new_version = Arc::new(
138 VersionBuilder::from_version(version)
139 .apply_edit(edit, purger)
140 .remove_memtables(memtables_to_remove)
141 .build(),
142 );
143
144 let mut version_data = self.data.write().unwrap();
145 version_data.version = new_version;
146 }
147
148 pub(crate) fn mark_dropped(&self, memtable_builder: &MemtableBuilderRef) {
150 let version = self.current().version;
151 let part_duration = Some(version.memtables.mutable.part_duration());
152 let next_memtable_id = version.memtables.mutable.next_memtable_id();
153 let new_mutable = Arc::new(TimePartitions::new(
154 version.metadata.clone(),
155 memtable_builder.clone(),
156 next_memtable_id,
157 part_duration,
158 ));
159
160 let mut data = self.data.write().unwrap();
161 data.is_dropped = true;
162 data.version.ssts.mark_all_deleted();
163 let new_version =
165 Arc::new(VersionBuilder::new(version.metadata.clone(), new_mutable).build());
166 data.version = new_version;
167 }
168
169 pub(crate) fn alter_schema(&self, metadata: RegionMetadataRef, builder: &MemtableBuilderRef) {
174 let version = self.current().version;
175 let part_duration = Some(version.memtables.mutable.part_duration());
176 let next_memtable_id = version.memtables.mutable.next_memtable_id();
177 let new_mutable = Arc::new(TimePartitions::new(
178 metadata.clone(),
179 builder.clone(),
180 next_memtable_id,
181 part_duration,
182 ));
183 debug_assert!(version.memtables.mutable.is_empty());
184 debug_assert!(version.memtables.immutables().is_empty());
185 let new_version = Arc::new(
186 VersionBuilder::from_version(version)
187 .metadata(metadata)
188 .memtables(MemtableVersion::new(new_mutable))
189 .build(),
190 );
191
192 let mut version_data = self.data.write().unwrap();
193 version_data.version = new_version;
194 }
195
196 pub(crate) fn truncate(
198 &self,
199 truncate_kind: TruncateKind,
200 memtable_builder: &MemtableBuilderRef,
201 ) {
202 let version = self.current().version;
203
204 let part_duration = version.memtables.mutable.part_duration();
205 let next_memtable_id = version.memtables.mutable.next_memtable_id();
206 let new_mutable = Arc::new(TimePartitions::new(
207 version.metadata.clone(),
208 memtable_builder.clone(),
209 next_memtable_id,
210 Some(part_duration),
211 ));
212 match truncate_kind {
213 TruncateKind::All {
214 truncated_entry_id,
215 truncated_sequence,
216 } => {
217 let new_version = Arc::new(
218 VersionBuilder::new(version.metadata.clone(), new_mutable)
219 .flushed_entry_id(truncated_entry_id)
220 .flushed_sequence(truncated_sequence)
221 .truncated_entry_id(Some(truncated_entry_id))
222 .build(),
223 );
224
225 let mut version_data = self.data.write().unwrap();
226 version_data.version.ssts.mark_all_deleted();
227 version_data.version = new_version;
228 }
229 TruncateKind::Partial { files_to_remove } => {
230 let new_version = Arc::new(
231 VersionBuilder::from_version(version)
232 .remove_files(files_to_remove.into_iter())
233 .build(),
234 );
235
236 let mut version_data = self.data.write().unwrap();
237 version_data.version = new_version;
239 }
240 };
241 }
242
243 pub(crate) fn overwrite_current(&self, version: VersionRef) {
245 let mut version_data = self.data.write().unwrap();
246 version_data.version = version;
247 }
248}
249
250pub(crate) type VersionControlRef = Arc<VersionControl>;
251
252#[derive(Debug, Clone)]
254pub(crate) struct VersionControlData {
255 pub(crate) version: VersionRef,
257 pub(crate) committed_sequence: SequenceNumber,
261 pub(crate) last_entry_id: EntryId,
265 pub(crate) is_dropped: bool,
267}
268
269impl VersionControlData {
270 pub(crate) fn series_count(&self) -> usize {
272 self.version.memtables.mutable.series_count()
273 }
274}
275
276#[derive(Clone, Debug)]
278pub(crate) struct Version {
279 pub(crate) metadata: RegionMetadataRef,
284 pub(crate) memtables: MemtableVersionRef,
288 pub(crate) ssts: SstVersionRef,
290 pub(crate) flushed_entry_id: EntryId,
292 pub(crate) flushed_sequence: SequenceNumber,
294 pub(crate) truncated_entry_id: Option<EntryId>,
298 pub(crate) compaction_time_window: Option<Duration>,
303 pub(crate) options: RegionOptions,
305}
306
307pub(crate) type VersionRef = Arc<Version>;
308
309pub(crate) struct VersionBuilder {
311 metadata: RegionMetadataRef,
312 memtables: MemtableVersionRef,
313 ssts: SstVersionRef,
314 flushed_entry_id: EntryId,
315 flushed_sequence: SequenceNumber,
316 truncated_entry_id: Option<EntryId>,
317 compaction_time_window: Option<Duration>,
318 options: RegionOptions,
319}
320
321impl VersionBuilder {
322 pub(crate) fn new(metadata: RegionMetadataRef, mutable: TimePartitionsRef) -> Self {
324 VersionBuilder {
325 metadata,
326 memtables: Arc::new(MemtableVersion::new(mutable)),
327 ssts: Arc::new(SstVersion::new()),
328 flushed_entry_id: 0,
329 flushed_sequence: 0,
330 truncated_entry_id: None,
331 compaction_time_window: None,
332 options: RegionOptions::default(),
333 }
334 }
335
336 pub(crate) fn from_version(version: VersionRef) -> Self {
338 VersionBuilder {
339 metadata: version.metadata.clone(),
340 memtables: version.memtables.clone(),
341 ssts: version.ssts.clone(),
342 flushed_entry_id: version.flushed_entry_id,
343 flushed_sequence: version.flushed_sequence,
344 truncated_entry_id: version.truncated_entry_id,
345 compaction_time_window: version.compaction_time_window,
346 options: version.options.clone(),
347 }
348 }
349
350 pub(crate) fn memtables(mut self, memtables: MemtableVersion) -> Self {
352 self.memtables = Arc::new(memtables);
353 self
354 }
355
356 pub(crate) fn metadata(mut self, metadata: RegionMetadataRef) -> Self {
358 self.metadata = metadata;
359 self
360 }
361
362 pub(crate) fn flushed_entry_id(mut self, entry_id: EntryId) -> Self {
364 self.flushed_entry_id = entry_id;
365 self
366 }
367
368 pub(crate) fn flushed_sequence(mut self, sequence: SequenceNumber) -> Self {
370 self.flushed_sequence = sequence;
371 self
372 }
373
374 pub(crate) fn truncated_entry_id(mut self, entry_id: Option<EntryId>) -> Self {
376 self.truncated_entry_id = entry_id;
377 self
378 }
379
380 pub(crate) fn compaction_time_window(mut self, window: Option<Duration>) -> Self {
382 self.compaction_time_window = window;
383 self
384 }
385
386 pub(crate) fn options(mut self, options: RegionOptions) -> Self {
388 self.options = options;
389 self
390 }
391
392 pub(crate) fn apply_edit(mut self, edit: RegionEdit, file_purger: FilePurgerRef) -> Self {
394 if let Some(entry_id) = edit.flushed_entry_id {
395 self.flushed_entry_id = self.flushed_entry_id.max(entry_id);
396 }
397 if let Some(sequence) = edit.flushed_sequence {
398 self.flushed_sequence = self.flushed_sequence.max(sequence);
399 }
400 if let Some(window) = edit.compaction_time_window {
401 self.compaction_time_window = Some(window);
402 }
403 if !edit.files_to_add.is_empty() || !edit.files_to_remove.is_empty() {
404 let mut ssts = (*self.ssts).clone();
405 ssts.add_files(file_purger, edit.files_to_add.into_iter());
406 ssts.remove_files(edit.files_to_remove.into_iter());
407 self.ssts = Arc::new(ssts);
408 }
409
410 self
411 }
412
413 pub(crate) fn remove_memtables(mut self, ids: &[MemtableId]) -> Self {
415 if !ids.is_empty() {
416 let mut memtables = (*self.memtables).clone();
417 memtables.remove_memtables(ids);
418 self.memtables = Arc::new(memtables);
419 }
420 self
421 }
422
423 pub(crate) fn add_files(
425 mut self,
426 file_purger: FilePurgerRef,
427 files: impl Iterator<Item = FileMeta>,
428 ) -> Self {
429 let mut ssts = (*self.ssts).clone();
430 ssts.add_files(file_purger, files);
431 self.ssts = Arc::new(ssts);
432
433 self
434 }
435
436 pub(crate) fn remove_files(mut self, files: impl Iterator<Item = FileMeta>) -> Self {
437 let mut ssts = (*self.ssts).clone();
438 ssts.remove_files(files);
439 self.ssts = Arc::new(ssts);
440
441 self
442 }
443
444 pub(crate) fn build(self) -> Version {
447 let compaction_time_window = self
448 .options
449 .compaction
450 .time_window()
451 .or(self.compaction_time_window);
452 if self.compaction_time_window.is_some()
453 && compaction_time_window != self.compaction_time_window
454 {
455 info!(
456 "VersionBuilder overwrites region compaction time window from {:?} to {:?}, region: {}",
457 self.compaction_time_window,
458 compaction_time_window,
459 self.metadata.region_id
460 );
461 }
462
463 Version {
464 metadata: self.metadata,
465 memtables: self.memtables,
466 ssts: self.ssts,
467 flushed_entry_id: self.flushed_entry_id,
468 flushed_sequence: self.flushed_sequence,
469 truncated_entry_id: self.truncated_entry_id,
470 compaction_time_window,
471 options: self.options,
472 }
473 }
474}