1use std::sync::{Arc, RwLock};
27use std::time::Duration;
28
29use common_telemetry::info;
30use store_api::metadata::RegionMetadataRef;
31use store_api::storage::SequenceNumber;
32
33use crate::error::Result;
34use crate::manifest::action::RegionEdit;
35use crate::memtable::time_partition::{TimePartitions, TimePartitionsRef};
36use crate::memtable::version::{MemtableVersion, MemtableVersionRef};
37use crate::memtable::{MemtableBuilderRef, MemtableId};
38use crate::region::options::RegionOptions;
39use crate::sst::file::FileMeta;
40use crate::sst::file_purger::FilePurgerRef;
41use crate::sst::version::{SstVersion, SstVersionRef};
42use crate::wal::EntryId;
43
44#[derive(Debug)]
49pub(crate) struct VersionControl {
50 data: RwLock<VersionControlData>,
51}
52
53impl VersionControl {
54 pub(crate) fn new(version: Version) -> VersionControl {
56 let (flushed_sequence, flushed_entry_id) =
58 (version.flushed_sequence, version.flushed_entry_id);
59 VersionControl {
60 data: RwLock::new(VersionControlData {
61 version: Arc::new(version),
62 committed_sequence: flushed_sequence,
63 last_entry_id: flushed_entry_id,
64 is_dropped: false,
65 }),
66 }
67 }
68
69 pub(crate) fn current(&self) -> VersionControlData {
71 self.data.read().unwrap().clone()
72 }
73
74 pub(crate) fn set_sequence_and_entry_id(&self, seq: SequenceNumber, entry_id: EntryId) {
76 let mut data = self.data.write().unwrap();
77 data.committed_sequence = seq;
78 data.last_entry_id = entry_id;
79 }
80
81 pub(crate) fn set_entry_id(&self, entry_id: EntryId) {
83 let mut data = self.data.write().unwrap();
84 data.last_entry_id = entry_id;
85 }
86
87 pub(crate) fn committed_sequence(&self) -> SequenceNumber {
89 self.data.read().unwrap().committed_sequence
90 }
91
92 pub(crate) fn freeze_mutable(&self) -> Result<()> {
94 let version = self.current().version;
95 let time_window = version.compaction_time_window;
96
97 let Some(new_memtables) = version
98 .memtables
99 .freeze_mutable(&version.metadata, time_window)?
100 else {
101 return Ok(());
102 };
103
104 let new_version = Arc::new(
106 VersionBuilder::from_version(version)
107 .memtables(new_memtables)
108 .build(),
109 );
110
111 let mut version_data = self.data.write().unwrap();
112 version_data.version = new_version;
113
114 Ok(())
115 }
116
117 pub(crate) fn alter_options(&self, options: RegionOptions) {
119 let version = self.current().version;
120 let new_version = Arc::new(
121 VersionBuilder::from_version(version)
122 .options(options)
123 .build(),
124 );
125 let mut version_data = self.data.write().unwrap();
126 version_data.version = new_version;
127 }
128
129 pub(crate) fn apply_edit(
131 &self,
132 edit: RegionEdit,
133 memtables_to_remove: &[MemtableId],
134 purger: FilePurgerRef,
135 ) {
136 let version = self.current().version;
137 let new_version = Arc::new(
138 VersionBuilder::from_version(version)
139 .apply_edit(edit, purger)
140 .remove_memtables(memtables_to_remove)
141 .build(),
142 );
143
144 let mut version_data = self.data.write().unwrap();
145 version_data.version = new_version;
146 }
147
148 pub(crate) fn mark_dropped(&self, memtable_builder: &MemtableBuilderRef) {
150 let version = self.current().version;
151 let part_duration = Some(version.memtables.mutable.part_duration());
152 let next_memtable_id = version.memtables.mutable.next_memtable_id();
153 let new_mutable = Arc::new(TimePartitions::new(
154 version.metadata.clone(),
155 memtable_builder.clone(),
156 next_memtable_id,
157 part_duration,
158 ));
159
160 let mut data = self.data.write().unwrap();
161 data.is_dropped = true;
162 data.version.ssts.mark_all_deleted();
163 let new_version =
165 Arc::new(VersionBuilder::new(version.metadata.clone(), new_mutable).build());
166 data.version = new_version;
167 }
168
169 pub(crate) fn alter_schema(&self, metadata: RegionMetadataRef, builder: &MemtableBuilderRef) {
174 let version = self.current().version;
175 let part_duration = Some(version.memtables.mutable.part_duration());
176 let next_memtable_id = version.memtables.mutable.next_memtable_id();
177 let new_mutable = Arc::new(TimePartitions::new(
178 metadata.clone(),
179 builder.clone(),
180 next_memtable_id,
181 part_duration,
182 ));
183 debug_assert!(version.memtables.mutable.is_empty());
184 debug_assert!(version.memtables.immutables().is_empty());
185 let new_version = Arc::new(
186 VersionBuilder::from_version(version)
187 .metadata(metadata)
188 .memtables(MemtableVersion::new(new_mutable))
189 .build(),
190 );
191
192 let mut version_data = self.data.write().unwrap();
193 version_data.version = new_version;
194 }
195
196 pub(crate) fn truncate(
198 &self,
199 truncated_entry_id: EntryId,
200 truncated_sequence: SequenceNumber,
201 memtable_builder: &MemtableBuilderRef,
202 ) {
203 let version = self.current().version;
204
205 let part_duration = version.memtables.mutable.part_duration();
206 let next_memtable_id = version.memtables.mutable.next_memtable_id();
207 let new_mutable = Arc::new(TimePartitions::new(
208 version.metadata.clone(),
209 memtable_builder.clone(),
210 next_memtable_id,
211 Some(part_duration),
212 ));
213 let new_version = Arc::new(
214 VersionBuilder::new(version.metadata.clone(), new_mutable)
215 .flushed_entry_id(truncated_entry_id)
216 .flushed_sequence(truncated_sequence)
217 .truncated_entry_id(Some(truncated_entry_id))
218 .build(),
219 );
220
221 let mut version_data = self.data.write().unwrap();
222 version_data.version.ssts.mark_all_deleted();
223 version_data.version = new_version;
224 }
225
226 pub(crate) fn overwrite_current(&self, version: VersionRef) {
228 let mut version_data = self.data.write().unwrap();
229 version_data.version = version;
230 }
231}
232
233pub(crate) type VersionControlRef = Arc<VersionControl>;
234
235#[derive(Debug, Clone)]
237pub(crate) struct VersionControlData {
238 pub(crate) version: VersionRef,
240 pub(crate) committed_sequence: SequenceNumber,
244 pub(crate) last_entry_id: EntryId,
248 pub(crate) is_dropped: bool,
250}
251
252impl VersionControlData {
253 pub(crate) fn series_count(&self) -> usize {
255 self.version.memtables.mutable.series_count()
256 }
257}
258
259#[derive(Clone, Debug)]
261pub(crate) struct Version {
262 pub(crate) metadata: RegionMetadataRef,
267 pub(crate) memtables: MemtableVersionRef,
271 pub(crate) ssts: SstVersionRef,
273 pub(crate) flushed_entry_id: EntryId,
275 pub(crate) flushed_sequence: SequenceNumber,
277 pub(crate) truncated_entry_id: Option<EntryId>,
281 pub(crate) compaction_time_window: Option<Duration>,
286 pub(crate) options: RegionOptions,
288}
289
290pub(crate) type VersionRef = Arc<Version>;
291
292pub(crate) struct VersionBuilder {
294 metadata: RegionMetadataRef,
295 memtables: MemtableVersionRef,
296 ssts: SstVersionRef,
297 flushed_entry_id: EntryId,
298 flushed_sequence: SequenceNumber,
299 truncated_entry_id: Option<EntryId>,
300 compaction_time_window: Option<Duration>,
301 options: RegionOptions,
302}
303
304impl VersionBuilder {
305 pub(crate) fn new(metadata: RegionMetadataRef, mutable: TimePartitionsRef) -> Self {
307 VersionBuilder {
308 metadata,
309 memtables: Arc::new(MemtableVersion::new(mutable)),
310 ssts: Arc::new(SstVersion::new()),
311 flushed_entry_id: 0,
312 flushed_sequence: 0,
313 truncated_entry_id: None,
314 compaction_time_window: None,
315 options: RegionOptions::default(),
316 }
317 }
318
319 pub(crate) fn from_version(version: VersionRef) -> Self {
321 VersionBuilder {
322 metadata: version.metadata.clone(),
323 memtables: version.memtables.clone(),
324 ssts: version.ssts.clone(),
325 flushed_entry_id: version.flushed_entry_id,
326 flushed_sequence: version.flushed_sequence,
327 truncated_entry_id: version.truncated_entry_id,
328 compaction_time_window: version.compaction_time_window,
329 options: version.options.clone(),
330 }
331 }
332
333 pub(crate) fn memtables(mut self, memtables: MemtableVersion) -> Self {
335 self.memtables = Arc::new(memtables);
336 self
337 }
338
339 pub(crate) fn metadata(mut self, metadata: RegionMetadataRef) -> Self {
341 self.metadata = metadata;
342 self
343 }
344
345 pub(crate) fn flushed_entry_id(mut self, entry_id: EntryId) -> Self {
347 self.flushed_entry_id = entry_id;
348 self
349 }
350
351 pub(crate) fn flushed_sequence(mut self, sequence: SequenceNumber) -> Self {
353 self.flushed_sequence = sequence;
354 self
355 }
356
357 pub(crate) fn truncated_entry_id(mut self, entry_id: Option<EntryId>) -> Self {
359 self.truncated_entry_id = entry_id;
360 self
361 }
362
363 pub(crate) fn compaction_time_window(mut self, window: Option<Duration>) -> Self {
365 self.compaction_time_window = window;
366 self
367 }
368
369 pub(crate) fn options(mut self, options: RegionOptions) -> Self {
371 self.options = options;
372 self
373 }
374
375 pub(crate) fn apply_edit(mut self, edit: RegionEdit, file_purger: FilePurgerRef) -> Self {
377 if let Some(entry_id) = edit.flushed_entry_id {
378 self.flushed_entry_id = self.flushed_entry_id.max(entry_id);
379 }
380 if let Some(sequence) = edit.flushed_sequence {
381 self.flushed_sequence = self.flushed_sequence.max(sequence);
382 }
383 if let Some(window) = edit.compaction_time_window {
384 self.compaction_time_window = Some(window);
385 }
386 if !edit.files_to_add.is_empty() || !edit.files_to_remove.is_empty() {
387 let mut ssts = (*self.ssts).clone();
388 ssts.add_files(file_purger, edit.files_to_add.into_iter());
389 ssts.remove_files(edit.files_to_remove.into_iter());
390 self.ssts = Arc::new(ssts);
391 }
392
393 self
394 }
395
396 pub(crate) fn remove_memtables(mut self, ids: &[MemtableId]) -> Self {
398 if !ids.is_empty() {
399 let mut memtables = (*self.memtables).clone();
400 memtables.remove_memtables(ids);
401 self.memtables = Arc::new(memtables);
402 }
403 self
404 }
405
406 pub(crate) fn add_files(
408 mut self,
409 file_purger: FilePurgerRef,
410 files: impl Iterator<Item = FileMeta>,
411 ) -> Self {
412 let mut ssts = (*self.ssts).clone();
413 ssts.add_files(file_purger, files);
414 self.ssts = Arc::new(ssts);
415
416 self
417 }
418
419 pub(crate) fn build(self) -> Version {
422 let compaction_time_window = self
423 .options
424 .compaction
425 .time_window()
426 .or(self.compaction_time_window);
427 if self.compaction_time_window.is_some()
428 && compaction_time_window != self.compaction_time_window
429 {
430 info!(
431 "VersionBuilder overwrites region compaction time window from {:?} to {:?}, region: {}",
432 self.compaction_time_window,
433 compaction_time_window,
434 self.metadata.region_id
435 );
436 }
437
438 Version {
439 metadata: self.metadata,
440 memtables: self.memtables,
441 ssts: self.ssts,
442 flushed_entry_id: self.flushed_entry_id,
443 flushed_sequence: self.flushed_sequence,
444 truncated_entry_id: self.truncated_entry_id,
445 compaction_time_window,
446 options: self.options,
447 }
448 }
449}