store_api/
region_engine.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Region Engine's definition
16
17use std::any::Any;
18use std::collections::HashMap;
19use std::fmt::{Debug, Display};
20use std::sync::{Arc, Mutex};
21
22use api::greptime_proto::v1::meta::{GrantedRegion as PbGrantedRegion, RegionRole as PbRegionRole};
23use api::region::RegionResponse;
24use async_trait::async_trait;
25use common_error::ext::{BoxedError, PlainError};
26use common_error::status_code::StatusCode;
27use common_recordbatch::SendableRecordBatchStream;
28use common_time::Timestamp;
29use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
30use datafusion_physical_plan::{DisplayAs, DisplayFormatType};
31use datatypes::schema::SchemaRef;
32use futures::future::join_all;
33use serde::{Deserialize, Serialize};
34use tokio::sync::Semaphore;
35
36use crate::logstore::entry;
37use crate::metadata::RegionMetadataRef;
38use crate::region_request::{
39    BatchRegionDdlRequest, RegionOpenRequest, RegionRequest, RegionSequencesRequest,
40};
41use crate::storage::{RegionId, ScanRequest, SequenceNumber};
42
43/// The settable region role state.
44#[derive(Debug, PartialEq, Eq, Clone, Copy)]
45pub enum SettableRegionRoleState {
46    Follower,
47    DowngradingLeader,
48}
49
50impl Display for SettableRegionRoleState {
51    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52        match self {
53            SettableRegionRoleState::Follower => write!(f, "Follower"),
54            SettableRegionRoleState::DowngradingLeader => write!(f, "Leader(Downgrading)"),
55        }
56    }
57}
58
59impl From<SettableRegionRoleState> for RegionRole {
60    fn from(value: SettableRegionRoleState) -> Self {
61        match value {
62            SettableRegionRoleState::Follower => RegionRole::Follower,
63            SettableRegionRoleState::DowngradingLeader => RegionRole::DowngradingLeader,
64        }
65    }
66}
67
68/// The request to set region role state.
69#[derive(Debug, PartialEq, Eq)]
70pub struct SetRegionRoleStateRequest {
71    region_id: RegionId,
72    region_role_state: SettableRegionRoleState,
73}
74
75/// The success response of setting region role state.
76#[derive(Debug, PartialEq, Eq)]
77pub enum SetRegionRoleStateSuccess {
78    File,
79    Mito {
80        last_entry_id: entry::Id,
81    },
82    Metric {
83        last_entry_id: entry::Id,
84        metadata_last_entry_id: entry::Id,
85    },
86}
87
88impl SetRegionRoleStateSuccess {
89    /// Returns a [SetRegionRoleStateSuccess::File].
90    pub fn file() -> Self {
91        Self::File
92    }
93
94    /// Returns a [SetRegionRoleStateSuccess::Mito] with the `last_entry_id`.
95    pub fn mito(last_entry_id: entry::Id) -> Self {
96        SetRegionRoleStateSuccess::Mito { last_entry_id }
97    }
98
99    /// Returns a [SetRegionRoleStateSuccess::Metric] with the `last_entry_id` and `metadata_last_entry_id`.
100    pub fn metric(last_entry_id: entry::Id, metadata_last_entry_id: entry::Id) -> Self {
101        SetRegionRoleStateSuccess::Metric {
102            last_entry_id,
103            metadata_last_entry_id,
104        }
105    }
106}
107
108impl SetRegionRoleStateSuccess {
109    /// Returns the last entry id of the region.
110    pub fn last_entry_id(&self) -> Option<entry::Id> {
111        match self {
112            SetRegionRoleStateSuccess::File => None,
113            SetRegionRoleStateSuccess::Mito { last_entry_id } => Some(*last_entry_id),
114            SetRegionRoleStateSuccess::Metric { last_entry_id, .. } => Some(*last_entry_id),
115        }
116    }
117
118    /// Returns the last entry id of the metadata of the region.
119    pub fn metadata_last_entry_id(&self) -> Option<entry::Id> {
120        match self {
121            SetRegionRoleStateSuccess::File => None,
122            SetRegionRoleStateSuccess::Mito { .. } => None,
123            SetRegionRoleStateSuccess::Metric {
124                metadata_last_entry_id,
125                ..
126            } => Some(*metadata_last_entry_id),
127        }
128    }
129}
130
131/// The response of setting region role state.
132#[derive(Debug, PartialEq, Eq)]
133pub enum SetRegionRoleStateResponse {
134    Success(SetRegionRoleStateSuccess),
135    NotFound,
136}
137
138impl SetRegionRoleStateResponse {
139    /// Returns a [SetRegionRoleStateResponse::Success] with the `File` success.
140    pub fn success(success: SetRegionRoleStateSuccess) -> Self {
141        Self::Success(success)
142    }
143
144    /// Returns true if the response is a [SetRegionRoleStateResponse::NotFound].
145    pub fn is_not_found(&self) -> bool {
146        matches!(self, SetRegionRoleStateResponse::NotFound)
147    }
148}
149
150#[derive(Debug, Clone, PartialEq, Eq)]
151pub struct GrantedRegion {
152    pub region_id: RegionId,
153    pub region_role: RegionRole,
154    pub extensions: HashMap<String, Vec<u8>>,
155}
156
157impl GrantedRegion {
158    pub fn new(region_id: RegionId, region_role: RegionRole) -> Self {
159        Self {
160            region_id,
161            region_role,
162            extensions: HashMap::new(),
163        }
164    }
165}
166
167impl From<GrantedRegion> for PbGrantedRegion {
168    fn from(value: GrantedRegion) -> Self {
169        PbGrantedRegion {
170            region_id: value.region_id.as_u64(),
171            role: PbRegionRole::from(value.region_role).into(),
172            extensions: value.extensions,
173        }
174    }
175}
176
177impl From<PbGrantedRegion> for GrantedRegion {
178    fn from(value: PbGrantedRegion) -> Self {
179        GrantedRegion {
180            region_id: RegionId::from_u64(value.region_id),
181            region_role: value.role().into(),
182            extensions: value.extensions,
183        }
184    }
185}
186
187/// The role of the region.
188/// TODO(weny): rename it to `RegionRoleState`
189#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
190pub enum RegionRole {
191    // Readonly region(mito2)
192    Follower,
193    // Writable region(mito2), Readonly region(file).
194    Leader,
195    // Leader is downgrading to follower.
196    //
197    // This state is used to prevent new write requests.
198    DowngradingLeader,
199}
200
201impl Display for RegionRole {
202    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
203        match self {
204            RegionRole::Follower => write!(f, "Follower"),
205            RegionRole::Leader => write!(f, "Leader"),
206            RegionRole::DowngradingLeader => write!(f, "Leader(Downgrading)"),
207        }
208    }
209}
210
211impl RegionRole {
212    pub fn writable(&self) -> bool {
213        matches!(self, RegionRole::Leader)
214    }
215}
216
217impl From<RegionRole> for PbRegionRole {
218    fn from(value: RegionRole) -> Self {
219        match value {
220            RegionRole::Follower => PbRegionRole::Follower,
221            RegionRole::Leader => PbRegionRole::Leader,
222            RegionRole::DowngradingLeader => PbRegionRole::DowngradingLeader,
223        }
224    }
225}
226
227impl From<PbRegionRole> for RegionRole {
228    fn from(value: PbRegionRole) -> Self {
229        match value {
230            PbRegionRole::Leader => RegionRole::Leader,
231            PbRegionRole::Follower => RegionRole::Follower,
232            PbRegionRole::DowngradingLeader => RegionRole::DowngradingLeader,
233        }
234    }
235}
236
237/// Output partition properties of the [RegionScanner].
238#[derive(Debug)]
239pub enum ScannerPartitioning {
240    /// Unknown partitioning scheme with a known number of partitions
241    Unknown(usize),
242}
243
244impl ScannerPartitioning {
245    /// Returns the number of partitions.
246    pub fn num_partitions(&self) -> usize {
247        match self {
248            ScannerPartitioning::Unknown(num_partitions) => *num_partitions,
249        }
250    }
251}
252
253/// Represents one data range within a partition
254#[derive(Debug, Clone, Copy, PartialEq, Eq)]
255pub struct PartitionRange {
256    /// Start time of time index column. Inclusive.
257    pub start: Timestamp,
258    /// End time of time index column. Exclusive.
259    pub end: Timestamp,
260    /// Number of rows in this range. Is used to balance ranges between partitions.
261    pub num_rows: usize,
262    /// Identifier to this range. Assigned by storage engine.
263    pub identifier: usize,
264}
265
266/// Properties of the [RegionScanner].
267#[derive(Debug, Default)]
268pub struct ScannerProperties {
269    /// A 2-dim partition ranges.
270    ///
271    /// The first dim vector's length represents the output partition number. The second
272    /// dim is ranges within one partition.
273    pub partitions: Vec<Vec<PartitionRange>>,
274
275    /// Whether scanner is in append-only mode.
276    append_mode: bool,
277
278    /// Total rows that **may** return by scanner. This field is only read iff
279    /// [ScannerProperties::append_mode] is true.
280    total_rows: usize,
281
282    /// Whether to yield an empty batch to distinguish partition ranges.
283    pub distinguish_partition_range: bool,
284
285    /// The target partitions of the scanner. 0 indicates using the number of partitions as target partitions.
286    target_partitions: usize,
287
288    /// Whether the scanner is scanning a logical region.
289    logical_region: bool,
290}
291
292impl ScannerProperties {
293    /// Sets append mode for scanner.
294    pub fn with_append_mode(mut self, append_mode: bool) -> Self {
295        self.append_mode = append_mode;
296        self
297    }
298
299    /// Sets total rows for scanner.
300    pub fn with_total_rows(mut self, total_rows: usize) -> Self {
301        self.total_rows = total_rows;
302        self
303    }
304
305    /// Creates a new [`ScannerProperties`] with the given partitioning.
306    pub fn new(partitions: Vec<Vec<PartitionRange>>, append_mode: bool, total_rows: usize) -> Self {
307        Self {
308            partitions,
309            append_mode,
310            total_rows,
311            distinguish_partition_range: false,
312            target_partitions: 0,
313            logical_region: false,
314        }
315    }
316
317    /// Updates the properties with the given [PrepareRequest].
318    pub fn prepare(&mut self, request: PrepareRequest) {
319        if let Some(ranges) = request.ranges {
320            self.partitions = ranges;
321        }
322        if let Some(distinguish_partition_range) = request.distinguish_partition_range {
323            self.distinguish_partition_range = distinguish_partition_range;
324        }
325        if let Some(target_partitions) = request.target_partitions {
326            self.target_partitions = target_partitions;
327        }
328    }
329
330    /// Returns the number of actual partitions.
331    pub fn num_partitions(&self) -> usize {
332        self.partitions.len()
333    }
334
335    pub fn append_mode(&self) -> bool {
336        self.append_mode
337    }
338
339    pub fn total_rows(&self) -> usize {
340        self.total_rows
341    }
342
343    /// Returns whether the scanner is scanning a logical region.
344    pub fn is_logical_region(&self) -> bool {
345        self.logical_region
346    }
347
348    /// Returns the target partitions of the scanner. If it is not set, returns the number of partitions.
349    pub fn target_partitions(&self) -> usize {
350        if self.target_partitions == 0 {
351            self.num_partitions()
352        } else {
353            self.target_partitions
354        }
355    }
356
357    /// Sets whether the scanner is reading a logical region.
358    pub fn set_logical_region(&mut self, logical_region: bool) {
359        self.logical_region = logical_region;
360    }
361}
362
363/// Request to override the scanner properties.
364#[derive(Default)]
365pub struct PrepareRequest {
366    /// Assigned partition ranges.
367    pub ranges: Option<Vec<Vec<PartitionRange>>>,
368    /// Distringuishes partition range by empty batches.
369    pub distinguish_partition_range: Option<bool>,
370    /// The expected number of target partitions.
371    pub target_partitions: Option<usize>,
372}
373
374impl PrepareRequest {
375    /// Sets the ranges.
376    pub fn with_ranges(mut self, ranges: Vec<Vec<PartitionRange>>) -> Self {
377        self.ranges = Some(ranges);
378        self
379    }
380
381    /// Sets the distinguish partition range flag.
382    pub fn with_distinguish_partition_range(mut self, distinguish_partition_range: bool) -> Self {
383        self.distinguish_partition_range = Some(distinguish_partition_range);
384        self
385    }
386
387    /// Sets the target partitions.
388    pub fn with_target_partitions(mut self, target_partitions: usize) -> Self {
389        self.target_partitions = Some(target_partitions);
390        self
391    }
392}
393
394/// A scanner that provides a way to scan the region concurrently.
395///
396/// The scanner splits the region into partitions so that each partition can be scanned concurrently.
397/// You can use this trait to implement an [`ExecutionPlan`](datafusion_physical_plan::ExecutionPlan).
398pub trait RegionScanner: Debug + DisplayAs + Send {
399    /// Returns the properties of the scanner.
400    fn properties(&self) -> &ScannerProperties;
401
402    /// Returns the schema of the record batches.
403    fn schema(&self) -> SchemaRef;
404
405    /// Returns the metadata of the region.
406    fn metadata(&self) -> RegionMetadataRef;
407
408    /// Prepares the scanner with the given partition ranges.
409    ///
410    /// This method is for the planner to adjust the scanner's behavior based on the partition ranges.
411    fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError>;
412
413    /// Scans the partition and returns a stream of record batches.
414    ///
415    /// # Panics
416    /// Panics if the `partition` is out of bound.
417    fn scan_partition(
418        &self,
419        metrics_set: &ExecutionPlanMetricsSet,
420        partition: usize,
421    ) -> Result<SendableRecordBatchStream, BoxedError>;
422
423    /// Check if there is any predicate that may be executed in this scanner.
424    fn has_predicate(&self) -> bool;
425
426    /// Sets whether the scanner is reading a logical region.
427    fn set_logical_region(&mut self, logical_region: bool);
428}
429
430pub type RegionScannerRef = Box<dyn RegionScanner>;
431
432pub type BatchResponses = Vec<(RegionId, Result<RegionResponse, BoxedError>)>;
433
434/// Represents the statistics of a region.
435#[derive(Debug, Deserialize, Serialize, Default)]
436pub struct RegionStatistic {
437    /// The number of rows
438    #[serde(default)]
439    pub num_rows: u64,
440    /// The size of memtable in bytes.
441    pub memtable_size: u64,
442    /// The size of WAL in bytes.
443    pub wal_size: u64,
444    /// The size of manifest in bytes.
445    pub manifest_size: u64,
446    /// The size of SST data files in bytes.
447    pub sst_size: u64,
448    /// The size of SST index files in bytes.
449    #[serde(default)]
450    pub index_size: u64,
451    /// The details of the region.
452    #[serde(default)]
453    pub manifest: RegionManifestInfo,
454    /// The latest entry id of the region's remote WAL since last flush.
455    /// For metric engine, there're two latest entry ids, one for data and one for metadata.
456    /// TODO(weny): remove this two fields and use single instead.
457    #[serde(default)]
458    pub data_topic_latest_entry_id: u64,
459    #[serde(default)]
460    pub metadata_topic_latest_entry_id: u64,
461}
462
463/// The manifest info of a region.
464#[derive(Debug, Clone, Serialize, Deserialize)]
465pub enum RegionManifestInfo {
466    Mito {
467        manifest_version: u64,
468        flushed_entry_id: u64,
469    },
470    Metric {
471        data_manifest_version: u64,
472        data_flushed_entry_id: u64,
473        metadata_manifest_version: u64,
474        metadata_flushed_entry_id: u64,
475    },
476}
477
478impl RegionManifestInfo {
479    /// Creates a new [RegionManifestInfo] for mito2 engine.
480    pub fn mito(manifest_version: u64, flushed_entry_id: u64) -> Self {
481        Self::Mito {
482            manifest_version,
483            flushed_entry_id,
484        }
485    }
486
487    /// Creates a new [RegionManifestInfo] for metric engine.
488    pub fn metric(
489        data_manifest_version: u64,
490        data_flushed_entry_id: u64,
491        metadata_manifest_version: u64,
492        metadata_flushed_entry_id: u64,
493    ) -> Self {
494        Self::Metric {
495            data_manifest_version,
496            data_flushed_entry_id,
497            metadata_manifest_version,
498            metadata_flushed_entry_id,
499        }
500    }
501
502    /// Returns true if the region is a mito2 region.
503    pub fn is_mito(&self) -> bool {
504        matches!(self, RegionManifestInfo::Mito { .. })
505    }
506
507    /// Returns true if the region is a metric region.
508    pub fn is_metric(&self) -> bool {
509        matches!(self, RegionManifestInfo::Metric { .. })
510    }
511
512    /// Returns the flushed entry id of the data region.
513    pub fn data_flushed_entry_id(&self) -> u64 {
514        match self {
515            RegionManifestInfo::Mito {
516                flushed_entry_id, ..
517            } => *flushed_entry_id,
518            RegionManifestInfo::Metric {
519                data_flushed_entry_id,
520                ..
521            } => *data_flushed_entry_id,
522        }
523    }
524
525    /// Returns the manifest version of the data region.
526    pub fn data_manifest_version(&self) -> u64 {
527        match self {
528            RegionManifestInfo::Mito {
529                manifest_version, ..
530            } => *manifest_version,
531            RegionManifestInfo::Metric {
532                data_manifest_version,
533                ..
534            } => *data_manifest_version,
535        }
536    }
537
538    /// Returns the manifest version of the metadata region.
539    pub fn metadata_manifest_version(&self) -> Option<u64> {
540        match self {
541            RegionManifestInfo::Mito { .. } => None,
542            RegionManifestInfo::Metric {
543                metadata_manifest_version,
544                ..
545            } => Some(*metadata_manifest_version),
546        }
547    }
548
549    /// Returns the flushed entry id of the metadata region.
550    pub fn metadata_flushed_entry_id(&self) -> Option<u64> {
551        match self {
552            RegionManifestInfo::Mito { .. } => None,
553            RegionManifestInfo::Metric {
554                metadata_flushed_entry_id,
555                ..
556            } => Some(*metadata_flushed_entry_id),
557        }
558    }
559
560    /// Encodes a list of ([RegionId], [RegionManifestInfo]) to a byte array.
561    pub fn encode_list(manifest_infos: &[(RegionId, Self)]) -> serde_json::Result<Vec<u8>> {
562        serde_json::to_vec(manifest_infos)
563    }
564
565    /// Decodes a list of ([RegionId], [RegionManifestInfo]) from a byte array.
566    pub fn decode_list(value: &[u8]) -> serde_json::Result<Vec<(RegionId, Self)>> {
567        serde_json::from_slice(value)
568    }
569}
570
571impl Default for RegionManifestInfo {
572    fn default() -> Self {
573        Self::Mito {
574            manifest_version: 0,
575            flushed_entry_id: 0,
576        }
577    }
578}
579
580impl RegionStatistic {
581    /// Deserializes the region statistic to a byte array.
582    ///
583    /// Returns None if the deserialization fails.
584    pub fn deserialize_from_slice(value: &[u8]) -> Option<RegionStatistic> {
585        serde_json::from_slice(value).ok()
586    }
587
588    /// Serializes the region statistic to a byte array.
589    ///
590    /// Returns None if the serialization fails.
591    pub fn serialize_to_vec(&self) -> Option<Vec<u8>> {
592        serde_json::to_vec(self).ok()
593    }
594}
595
596impl RegionStatistic {
597    /// Returns the estimated disk size of the region.
598    pub fn estimated_disk_size(&self) -> u64 {
599        self.wal_size + self.sst_size + self.manifest_size + self.index_size
600    }
601}
602
603/// The response of syncing the manifest.
604#[derive(Debug)]
605pub enum SyncManifestResponse {
606    NotSupported,
607    Mito {
608        /// Indicates if the data region was synced.
609        synced: bool,
610    },
611    Metric {
612        /// Indicates if the metadata region was synced.
613        metadata_synced: bool,
614        /// Indicates if the data region was synced.
615        data_synced: bool,
616        /// The logical regions that were newly opened during the sync operation.
617        /// This only occurs after the metadata region has been successfully synced.
618        new_opened_logical_region_ids: Vec<RegionId>,
619    },
620}
621
622impl SyncManifestResponse {
623    /// Returns true if data region is synced.
624    pub fn is_data_synced(&self) -> bool {
625        match self {
626            SyncManifestResponse::NotSupported => false,
627            SyncManifestResponse::Mito { synced } => *synced,
628            SyncManifestResponse::Metric { data_synced, .. } => *data_synced,
629        }
630    }
631
632    /// Returns true if the engine is supported the sync operation.
633    pub fn is_supported(&self) -> bool {
634        matches!(self, SyncManifestResponse::NotSupported)
635    }
636
637    /// Returns true if the engine is a mito2 engine.
638    pub fn is_mito(&self) -> bool {
639        matches!(self, SyncManifestResponse::Mito { .. })
640    }
641
642    /// Returns true if the engine is a metric engine.
643    pub fn is_metric(&self) -> bool {
644        matches!(self, SyncManifestResponse::Metric { .. })
645    }
646
647    /// Returns the new opened logical region ids.
648    pub fn new_opened_logical_region_ids(self) -> Option<Vec<RegionId>> {
649        match self {
650            SyncManifestResponse::Metric {
651                new_opened_logical_region_ids,
652                ..
653            } => Some(new_opened_logical_region_ids),
654            _ => None,
655        }
656    }
657}
658
659#[async_trait]
660pub trait RegionEngine: Send + Sync {
661    /// Name of this engine
662    fn name(&self) -> &str;
663
664    /// Handles batch open region requests.
665    async fn handle_batch_open_requests(
666        &self,
667        parallelism: usize,
668        requests: Vec<(RegionId, RegionOpenRequest)>,
669    ) -> Result<BatchResponses, BoxedError> {
670        let semaphore = Arc::new(Semaphore::new(parallelism));
671        let mut tasks = Vec::with_capacity(requests.len());
672
673        for (region_id, request) in requests {
674            let semaphore_moved = semaphore.clone();
675
676            tasks.push(async move {
677                // Safety: semaphore must exist
678                let _permit = semaphore_moved.acquire().await.unwrap();
679                let result = self
680                    .handle_request(region_id, RegionRequest::Open(request))
681                    .await;
682                (region_id, result)
683            });
684        }
685
686        Ok(join_all(tasks).await)
687    }
688
689    async fn handle_batch_ddl_requests(
690        &self,
691        request: BatchRegionDdlRequest,
692    ) -> Result<RegionResponse, BoxedError> {
693        let requests = request.into_region_requests();
694
695        let mut affected_rows = 0;
696        let mut extensions = HashMap::new();
697
698        for (region_id, request) in requests {
699            let result = self.handle_request(region_id, request).await?;
700            affected_rows += result.affected_rows;
701            extensions.extend(result.extensions);
702        }
703
704        Ok(RegionResponse {
705            affected_rows,
706            extensions,
707        })
708    }
709
710    /// Handles non-query request to the region. Returns the count of affected rows.
711    async fn handle_request(
712        &self,
713        region_id: RegionId,
714        request: RegionRequest,
715    ) -> Result<RegionResponse, BoxedError>;
716
717    /// Returns the last sequence number of the region.
718    async fn get_last_seq_num(
719        &self,
720        region_id: RegionId,
721    ) -> Result<Option<SequenceNumber>, BoxedError>;
722
723    async fn get_region_sequences(
724        &self,
725        seqs: RegionSequencesRequest,
726    ) -> Result<HashMap<u64, u64>, BoxedError> {
727        let mut results = HashMap::with_capacity(seqs.region_ids.len());
728
729        for region_id in seqs.region_ids {
730            let seq = self.get_last_seq_num(region_id).await?.unwrap_or_default();
731            results.insert(region_id.as_u64(), seq);
732        }
733
734        Ok(results)
735    }
736
737    /// Handles query and return a scanner that can be used to scan the region concurrently.
738    async fn handle_query(
739        &self,
740        region_id: RegionId,
741        request: ScanRequest,
742    ) -> Result<RegionScannerRef, BoxedError>;
743
744    /// Retrieves region's metadata.
745    async fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef, BoxedError>;
746
747    /// Retrieves region's statistic.
748    fn region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic>;
749
750    /// Stops the engine
751    async fn stop(&self) -> Result<(), BoxedError>;
752
753    /// Sets [RegionRole] for a region.
754    ///
755    /// The engine checks whether the region is writable before writing to the region. Setting
756    /// the region as readonly doesn't guarantee that write operations in progress will not
757    /// take effect.
758    fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<(), BoxedError>;
759
760    /// Syncs the region manifest to the given manifest version.
761    async fn sync_region(
762        &self,
763        region_id: RegionId,
764        manifest_info: RegionManifestInfo,
765    ) -> Result<SyncManifestResponse, BoxedError>;
766
767    /// Sets region role state gracefully.
768    ///
769    /// After the call returns, the engine ensures no more write operations will succeed in the region.
770    async fn set_region_role_state_gracefully(
771        &self,
772        region_id: RegionId,
773        region_role_state: SettableRegionRoleState,
774    ) -> Result<SetRegionRoleStateResponse, BoxedError>;
775
776    /// Indicates region role.
777    ///
778    /// Returns the `None` if the region is not found.
779    fn role(&self, region_id: RegionId) -> Option<RegionRole>;
780
781    fn as_any(&self) -> &dyn Any;
782}
783
784pub type RegionEngineRef = Arc<dyn RegionEngine>;
785
786/// A [RegionScanner] that only scans a single partition.
787pub struct SinglePartitionScanner {
788    stream: Mutex<Option<SendableRecordBatchStream>>,
789    schema: SchemaRef,
790    properties: ScannerProperties,
791    metadata: RegionMetadataRef,
792}
793
794impl SinglePartitionScanner {
795    /// Creates a new [SinglePartitionScanner] with the given stream and metadata.
796    pub fn new(
797        stream: SendableRecordBatchStream,
798        append_mode: bool,
799        metadata: RegionMetadataRef,
800    ) -> Self {
801        let schema = stream.schema();
802        Self {
803            stream: Mutex::new(Some(stream)),
804            schema,
805            properties: ScannerProperties::default().with_append_mode(append_mode),
806            metadata,
807        }
808    }
809}
810
811impl Debug for SinglePartitionScanner {
812    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
813        write!(f, "SinglePartitionScanner: <SendableRecordBatchStream>")
814    }
815}
816
817impl RegionScanner for SinglePartitionScanner {
818    fn properties(&self) -> &ScannerProperties {
819        &self.properties
820    }
821
822    fn schema(&self) -> SchemaRef {
823        self.schema.clone()
824    }
825
826    fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
827        self.properties.prepare(request);
828        Ok(())
829    }
830
831    fn scan_partition(
832        &self,
833        _metrics_set: &ExecutionPlanMetricsSet,
834        _partition: usize,
835    ) -> Result<SendableRecordBatchStream, BoxedError> {
836        let mut stream = self.stream.lock().unwrap();
837        stream.take().ok_or_else(|| {
838            BoxedError::new(PlainError::new(
839                "Not expected to run ExecutionPlan more than once".to_string(),
840                StatusCode::Unexpected,
841            ))
842        })
843    }
844
845    fn has_predicate(&self) -> bool {
846        false
847    }
848
849    fn metadata(&self) -> RegionMetadataRef {
850        self.metadata.clone()
851    }
852
853    fn set_logical_region(&mut self, logical_region: bool) {
854        self.properties.set_logical_region(logical_region);
855    }
856}
857
858impl DisplayAs for SinglePartitionScanner {
859    fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
860        write!(f, "{:?}", self)
861    }
862}