store_api/
region_engine.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Region Engine's definition
16
17use std::any::Any;
18use std::collections::HashMap;
19use std::fmt::{Debug, Display};
20use std::sync::{Arc, Mutex};
21
22use api::greptime_proto::v1::meta::{GrantedRegion as PbGrantedRegion, RegionRole as PbRegionRole};
23use api::region::RegionResponse;
24use async_trait::async_trait;
25use common_error::ext::BoxedError;
26use common_recordbatch::{EmptyRecordBatchStream, SendableRecordBatchStream};
27use common_time::Timestamp;
28use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
29use datafusion_physical_plan::{DisplayAs, DisplayFormatType};
30use datatypes::schema::SchemaRef;
31use futures::future::join_all;
32use serde::{Deserialize, Serialize};
33use tokio::sync::Semaphore;
34
35use crate::logstore::entry;
36use crate::metadata::RegionMetadataRef;
37use crate::region_request::{BatchRegionDdlRequest, RegionOpenRequest, RegionRequest};
38use crate::storage::{RegionId, ScanRequest, SequenceNumber};
39
40/// The settable region role state.
41#[derive(Debug, PartialEq, Eq, Clone, Copy)]
42pub enum SettableRegionRoleState {
43    Follower,
44    DowngradingLeader,
45    /// Exit staging mode and return to normal leader state. Only allowed from staging state.
46    Leader,
47    /// Enter staging mode. Region remains writable but disables checkpoint and compaction. Only allowed from normal leader state.
48    StagingLeader,
49}
50
51impl Display for SettableRegionRoleState {
52    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
53        match self {
54            SettableRegionRoleState::Follower => write!(f, "Follower"),
55            SettableRegionRoleState::DowngradingLeader => write!(f, "Leader(Downgrading)"),
56            SettableRegionRoleState::Leader => write!(f, "Leader"),
57            SettableRegionRoleState::StagingLeader => write!(f, "Leader(Staging)"),
58        }
59    }
60}
61
62impl From<SettableRegionRoleState> for RegionRole {
63    fn from(value: SettableRegionRoleState) -> Self {
64        match value {
65            SettableRegionRoleState::Follower => RegionRole::Follower,
66            SettableRegionRoleState::DowngradingLeader => RegionRole::DowngradingLeader,
67            SettableRegionRoleState::Leader => RegionRole::Leader,
68            SettableRegionRoleState::StagingLeader => RegionRole::Leader, // Still a leader role
69        }
70    }
71}
72
73/// The request to set region role state.
74#[derive(Debug, PartialEq, Eq)]
75pub struct SetRegionRoleStateRequest {
76    region_id: RegionId,
77    region_role_state: SettableRegionRoleState,
78}
79
80/// The success response of setting region role state.
81#[derive(Debug, PartialEq, Eq)]
82pub enum SetRegionRoleStateSuccess {
83    File,
84    Mito {
85        last_entry_id: entry::Id,
86    },
87    Metric {
88        last_entry_id: entry::Id,
89        metadata_last_entry_id: entry::Id,
90    },
91}
92
93impl SetRegionRoleStateSuccess {
94    /// Returns a [SetRegionRoleStateSuccess::File].
95    pub fn file() -> Self {
96        Self::File
97    }
98
99    /// Returns a [SetRegionRoleStateSuccess::Mito] with the `last_entry_id`.
100    pub fn mito(last_entry_id: entry::Id) -> Self {
101        SetRegionRoleStateSuccess::Mito { last_entry_id }
102    }
103
104    /// Returns a [SetRegionRoleStateSuccess::Metric] with the `last_entry_id` and `metadata_last_entry_id`.
105    pub fn metric(last_entry_id: entry::Id, metadata_last_entry_id: entry::Id) -> Self {
106        SetRegionRoleStateSuccess::Metric {
107            last_entry_id,
108            metadata_last_entry_id,
109        }
110    }
111}
112
113impl SetRegionRoleStateSuccess {
114    /// Returns the last entry id of the region.
115    pub fn last_entry_id(&self) -> Option<entry::Id> {
116        match self {
117            SetRegionRoleStateSuccess::File => None,
118            SetRegionRoleStateSuccess::Mito { last_entry_id } => Some(*last_entry_id),
119            SetRegionRoleStateSuccess::Metric { last_entry_id, .. } => Some(*last_entry_id),
120        }
121    }
122
123    /// Returns the last entry id of the metadata of the region.
124    pub fn metadata_last_entry_id(&self) -> Option<entry::Id> {
125        match self {
126            SetRegionRoleStateSuccess::File => None,
127            SetRegionRoleStateSuccess::Mito { .. } => None,
128            SetRegionRoleStateSuccess::Metric {
129                metadata_last_entry_id,
130                ..
131            } => Some(*metadata_last_entry_id),
132        }
133    }
134}
135
136/// The response of setting region role state.
137#[derive(Debug)]
138pub enum SetRegionRoleStateResponse {
139    Success(SetRegionRoleStateSuccess),
140    NotFound,
141    InvalidTransition(BoxedError),
142}
143
144impl SetRegionRoleStateResponse {
145    /// Returns a [SetRegionRoleStateResponse::Success] with the `File` success.
146    pub fn success(success: SetRegionRoleStateSuccess) -> Self {
147        Self::Success(success)
148    }
149
150    /// Returns a [SetRegionRoleStateResponse::InvalidTransition] with the error.
151    pub fn invalid_transition(error: BoxedError) -> Self {
152        Self::InvalidTransition(error)
153    }
154
155    /// Returns true if the response is a [SetRegionRoleStateResponse::NotFound].
156    pub fn is_not_found(&self) -> bool {
157        matches!(self, SetRegionRoleStateResponse::NotFound)
158    }
159
160    /// Returns true if the response is a [SetRegionRoleStateResponse::InvalidTransition].
161    pub fn is_invalid_transition(&self) -> bool {
162        matches!(self, SetRegionRoleStateResponse::InvalidTransition(_))
163    }
164}
165
166#[derive(Debug, Clone, PartialEq, Eq)]
167pub struct GrantedRegion {
168    pub region_id: RegionId,
169    pub region_role: RegionRole,
170    pub extensions: HashMap<String, Vec<u8>>,
171}
172
173impl GrantedRegion {
174    pub fn new(region_id: RegionId, region_role: RegionRole) -> Self {
175        Self {
176            region_id,
177            region_role,
178            extensions: HashMap::new(),
179        }
180    }
181}
182
183impl From<GrantedRegion> for PbGrantedRegion {
184    fn from(value: GrantedRegion) -> Self {
185        PbGrantedRegion {
186            region_id: value.region_id.as_u64(),
187            role: PbRegionRole::from(value.region_role).into(),
188            extensions: value.extensions,
189        }
190    }
191}
192
193impl From<PbGrantedRegion> for GrantedRegion {
194    fn from(value: PbGrantedRegion) -> Self {
195        GrantedRegion {
196            region_id: RegionId::from_u64(value.region_id),
197            region_role: value.role().into(),
198            extensions: value.extensions,
199        }
200    }
201}
202
203/// The role of the region.
204/// TODO(weny): rename it to `RegionRoleState`
205#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
206pub enum RegionRole {
207    // Readonly region(mito2)
208    Follower,
209    // Writable region(mito2), Readonly region(file).
210    Leader,
211    // Leader is downgrading to follower.
212    //
213    // This state is used to prevent new write requests.
214    DowngradingLeader,
215}
216
217impl Display for RegionRole {
218    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
219        match self {
220            RegionRole::Follower => write!(f, "Follower"),
221            RegionRole::Leader => write!(f, "Leader"),
222            RegionRole::DowngradingLeader => write!(f, "Leader(Downgrading)"),
223        }
224    }
225}
226
227impl RegionRole {
228    pub fn writable(&self) -> bool {
229        matches!(self, RegionRole::Leader)
230    }
231}
232
233impl From<RegionRole> for PbRegionRole {
234    fn from(value: RegionRole) -> Self {
235        match value {
236            RegionRole::Follower => PbRegionRole::Follower,
237            RegionRole::Leader => PbRegionRole::Leader,
238            RegionRole::DowngradingLeader => PbRegionRole::DowngradingLeader,
239        }
240    }
241}
242
243impl From<PbRegionRole> for RegionRole {
244    fn from(value: PbRegionRole) -> Self {
245        match value {
246            PbRegionRole::Leader => RegionRole::Leader,
247            PbRegionRole::Follower => RegionRole::Follower,
248            PbRegionRole::DowngradingLeader => RegionRole::DowngradingLeader,
249        }
250    }
251}
252
253/// Output partition properties of the [RegionScanner].
254#[derive(Debug)]
255pub enum ScannerPartitioning {
256    /// Unknown partitioning scheme with a known number of partitions
257    Unknown(usize),
258}
259
260impl ScannerPartitioning {
261    /// Returns the number of partitions.
262    pub fn num_partitions(&self) -> usize {
263        match self {
264            ScannerPartitioning::Unknown(num_partitions) => *num_partitions,
265        }
266    }
267}
268
269/// Represents one data range within a partition
270#[derive(Debug, Clone, Copy, PartialEq, Eq)]
271pub struct PartitionRange {
272    /// Start time of time index column. Inclusive.
273    pub start: Timestamp,
274    /// End time of time index column. Exclusive.
275    pub end: Timestamp,
276    /// Number of rows in this range. Is used to balance ranges between partitions.
277    pub num_rows: usize,
278    /// Identifier to this range. Assigned by storage engine.
279    pub identifier: usize,
280}
281
282/// Properties of the [RegionScanner].
283#[derive(Debug, Default)]
284pub struct ScannerProperties {
285    /// A 2-dim partition ranges.
286    ///
287    /// The first dim vector's length represents the output partition number. The second
288    /// dim is ranges within one partition.
289    pub partitions: Vec<Vec<PartitionRange>>,
290
291    /// Whether scanner is in append-only mode.
292    append_mode: bool,
293
294    /// Total rows that **may** return by scanner. This field is only read iff
295    /// [ScannerProperties::append_mode] is true.
296    total_rows: usize,
297
298    /// Whether to yield an empty batch to distinguish partition ranges.
299    pub distinguish_partition_range: bool,
300
301    /// The target partitions of the scanner. 0 indicates using the number of partitions as target partitions.
302    target_partitions: usize,
303
304    /// Whether the scanner is scanning a logical region.
305    logical_region: bool,
306}
307
308impl ScannerProperties {
309    /// Sets append mode for scanner.
310    pub fn with_append_mode(mut self, append_mode: bool) -> Self {
311        self.append_mode = append_mode;
312        self
313    }
314
315    /// Sets total rows for scanner.
316    pub fn with_total_rows(mut self, total_rows: usize) -> Self {
317        self.total_rows = total_rows;
318        self
319    }
320
321    /// Creates a new [`ScannerProperties`] with the given partitioning.
322    pub fn new(partitions: Vec<Vec<PartitionRange>>, append_mode: bool, total_rows: usize) -> Self {
323        Self {
324            partitions,
325            append_mode,
326            total_rows,
327            distinguish_partition_range: false,
328            target_partitions: 0,
329            logical_region: false,
330        }
331    }
332
333    /// Updates the properties with the given [PrepareRequest].
334    pub fn prepare(&mut self, request: PrepareRequest) {
335        if let Some(ranges) = request.ranges {
336            self.partitions = ranges;
337        }
338        if let Some(distinguish_partition_range) = request.distinguish_partition_range {
339            self.distinguish_partition_range = distinguish_partition_range;
340        }
341        if let Some(target_partitions) = request.target_partitions {
342            self.target_partitions = target_partitions;
343        }
344    }
345
346    /// Returns the number of actual partitions.
347    pub fn num_partitions(&self) -> usize {
348        self.partitions.len()
349    }
350
351    pub fn append_mode(&self) -> bool {
352        self.append_mode
353    }
354
355    pub fn total_rows(&self) -> usize {
356        self.total_rows
357    }
358
359    /// Returns whether the scanner is scanning a logical region.
360    pub fn is_logical_region(&self) -> bool {
361        self.logical_region
362    }
363
364    /// Returns the target partitions of the scanner. If it is not set, returns the number of partitions.
365    pub fn target_partitions(&self) -> usize {
366        if self.target_partitions == 0 {
367            self.num_partitions()
368        } else {
369            self.target_partitions
370        }
371    }
372
373    /// Sets whether the scanner is reading a logical region.
374    pub fn set_logical_region(&mut self, logical_region: bool) {
375        self.logical_region = logical_region;
376    }
377}
378
379/// Request to override the scanner properties.
380#[derive(Default)]
381pub struct PrepareRequest {
382    /// Assigned partition ranges.
383    pub ranges: Option<Vec<Vec<PartitionRange>>>,
384    /// Distringuishes partition range by empty batches.
385    pub distinguish_partition_range: Option<bool>,
386    /// The expected number of target partitions.
387    pub target_partitions: Option<usize>,
388}
389
390impl PrepareRequest {
391    /// Sets the ranges.
392    pub fn with_ranges(mut self, ranges: Vec<Vec<PartitionRange>>) -> Self {
393        self.ranges = Some(ranges);
394        self
395    }
396
397    /// Sets the distinguish partition range flag.
398    pub fn with_distinguish_partition_range(mut self, distinguish_partition_range: bool) -> Self {
399        self.distinguish_partition_range = Some(distinguish_partition_range);
400        self
401    }
402
403    /// Sets the target partitions.
404    pub fn with_target_partitions(mut self, target_partitions: usize) -> Self {
405        self.target_partitions = Some(target_partitions);
406        self
407    }
408}
409
410/// Necessary context of the query for the scanner.
411#[derive(Clone, Default)]
412pub struct QueryScanContext {
413    /// Whether the query is EXPLAIN ANALYZE VERBOSE.
414    pub explain_verbose: bool,
415}
416
417/// A scanner that provides a way to scan the region concurrently.
418///
419/// The scanner splits the region into partitions so that each partition can be scanned concurrently.
420/// You can use this trait to implement an [`ExecutionPlan`](datafusion_physical_plan::ExecutionPlan).
421pub trait RegionScanner: Debug + DisplayAs + Send {
422    /// Returns the properties of the scanner.
423    fn properties(&self) -> &ScannerProperties;
424
425    /// Returns the schema of the record batches.
426    fn schema(&self) -> SchemaRef;
427
428    /// Returns the metadata of the region.
429    fn metadata(&self) -> RegionMetadataRef;
430
431    /// Prepares the scanner with the given partition ranges.
432    ///
433    /// This method is for the planner to adjust the scanner's behavior based on the partition ranges.
434    fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError>;
435
436    /// Scans the partition and returns a stream of record batches.
437    ///
438    /// # Panics
439    /// Panics if the `partition` is out of bound.
440    fn scan_partition(
441        &self,
442        ctx: &QueryScanContext,
443        metrics_set: &ExecutionPlanMetricsSet,
444        partition: usize,
445    ) -> Result<SendableRecordBatchStream, BoxedError>;
446
447    /// Check if there is any predicate that may be executed in this scanner.
448    fn has_predicate(&self) -> bool;
449
450    /// Sets whether the scanner is reading a logical region.
451    fn set_logical_region(&mut self, logical_region: bool);
452}
453
454pub type RegionScannerRef = Box<dyn RegionScanner>;
455
456pub type BatchResponses = Vec<(RegionId, Result<RegionResponse, BoxedError>)>;
457
458/// Represents the statistics of a region.
459#[derive(Debug, Deserialize, Serialize, Default)]
460pub struct RegionStatistic {
461    /// The number of rows
462    #[serde(default)]
463    pub num_rows: u64,
464    /// The size of memtable in bytes.
465    pub memtable_size: u64,
466    /// The size of WAL in bytes.
467    pub wal_size: u64,
468    /// The size of manifest in bytes.
469    pub manifest_size: u64,
470    /// The size of SST data files in bytes.
471    pub sst_size: u64,
472    /// The num of SST files.
473    pub sst_num: u64,
474    /// The size of SST index files in bytes.
475    #[serde(default)]
476    pub index_size: u64,
477    /// The details of the region.
478    #[serde(default)]
479    pub manifest: RegionManifestInfo,
480    #[serde(default)]
481    /// The total bytes written of the region since region opened.
482    pub written_bytes: u64,
483    /// The latest entry id of the region's remote WAL since last flush.
484    /// For metric engine, there're two latest entry ids, one for data and one for metadata.
485    /// TODO(weny): remove this two fields and use single instead.
486    #[serde(default)]
487    pub data_topic_latest_entry_id: u64,
488    #[serde(default)]
489    pub metadata_topic_latest_entry_id: u64,
490}
491
492/// The manifest info of a region.
493#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
494pub enum RegionManifestInfo {
495    Mito {
496        manifest_version: u64,
497        flushed_entry_id: u64,
498    },
499    Metric {
500        data_manifest_version: u64,
501        data_flushed_entry_id: u64,
502        metadata_manifest_version: u64,
503        metadata_flushed_entry_id: u64,
504    },
505}
506
507impl RegionManifestInfo {
508    /// Creates a new [RegionManifestInfo] for mito2 engine.
509    pub fn mito(manifest_version: u64, flushed_entry_id: u64) -> Self {
510        Self::Mito {
511            manifest_version,
512            flushed_entry_id,
513        }
514    }
515
516    /// Creates a new [RegionManifestInfo] for metric engine.
517    pub fn metric(
518        data_manifest_version: u64,
519        data_flushed_entry_id: u64,
520        metadata_manifest_version: u64,
521        metadata_flushed_entry_id: u64,
522    ) -> Self {
523        Self::Metric {
524            data_manifest_version,
525            data_flushed_entry_id,
526            metadata_manifest_version,
527            metadata_flushed_entry_id,
528        }
529    }
530
531    /// Returns true if the region is a mito2 region.
532    pub fn is_mito(&self) -> bool {
533        matches!(self, RegionManifestInfo::Mito { .. })
534    }
535
536    /// Returns true if the region is a metric region.
537    pub fn is_metric(&self) -> bool {
538        matches!(self, RegionManifestInfo::Metric { .. })
539    }
540
541    /// Returns the flushed entry id of the data region.
542    pub fn data_flushed_entry_id(&self) -> u64 {
543        match self {
544            RegionManifestInfo::Mito {
545                flushed_entry_id, ..
546            } => *flushed_entry_id,
547            RegionManifestInfo::Metric {
548                data_flushed_entry_id,
549                ..
550            } => *data_flushed_entry_id,
551        }
552    }
553
554    /// Returns the manifest version of the data region.
555    pub fn data_manifest_version(&self) -> u64 {
556        match self {
557            RegionManifestInfo::Mito {
558                manifest_version, ..
559            } => *manifest_version,
560            RegionManifestInfo::Metric {
561                data_manifest_version,
562                ..
563            } => *data_manifest_version,
564        }
565    }
566
567    /// Returns the manifest version of the metadata region.
568    pub fn metadata_manifest_version(&self) -> Option<u64> {
569        match self {
570            RegionManifestInfo::Mito { .. } => None,
571            RegionManifestInfo::Metric {
572                metadata_manifest_version,
573                ..
574            } => Some(*metadata_manifest_version),
575        }
576    }
577
578    /// Returns the flushed entry id of the metadata region.
579    pub fn metadata_flushed_entry_id(&self) -> Option<u64> {
580        match self {
581            RegionManifestInfo::Mito { .. } => None,
582            RegionManifestInfo::Metric {
583                metadata_flushed_entry_id,
584                ..
585            } => Some(*metadata_flushed_entry_id),
586        }
587    }
588
589    /// Encodes a list of ([RegionId], [RegionManifestInfo]) to a byte array.
590    pub fn encode_list(manifest_infos: &[(RegionId, Self)]) -> serde_json::Result<Vec<u8>> {
591        serde_json::to_vec(manifest_infos)
592    }
593
594    /// Decodes a list of ([RegionId], [RegionManifestInfo]) from a byte array.
595    pub fn decode_list(value: &[u8]) -> serde_json::Result<Vec<(RegionId, Self)>> {
596        serde_json::from_slice(value)
597    }
598}
599
600impl Default for RegionManifestInfo {
601    fn default() -> Self {
602        Self::Mito {
603            manifest_version: 0,
604            flushed_entry_id: 0,
605        }
606    }
607}
608
609impl RegionStatistic {
610    /// Deserializes the region statistic to a byte array.
611    ///
612    /// Returns None if the deserialization fails.
613    pub fn deserialize_from_slice(value: &[u8]) -> Option<RegionStatistic> {
614        serde_json::from_slice(value).ok()
615    }
616
617    /// Serializes the region statistic to a byte array.
618    ///
619    /// Returns None if the serialization fails.
620    pub fn serialize_to_vec(&self) -> Option<Vec<u8>> {
621        serde_json::to_vec(self).ok()
622    }
623}
624
625impl RegionStatistic {
626    /// Returns the estimated disk size of the region.
627    pub fn estimated_disk_size(&self) -> u64 {
628        self.wal_size + self.sst_size + self.manifest_size + self.index_size
629    }
630}
631
632/// The response of syncing the manifest.
633#[derive(Debug)]
634pub enum SyncManifestResponse {
635    NotSupported,
636    Mito {
637        /// Indicates if the data region was synced.
638        synced: bool,
639    },
640    Metric {
641        /// Indicates if the metadata region was synced.
642        metadata_synced: bool,
643        /// Indicates if the data region was synced.
644        data_synced: bool,
645        /// The logical regions that were newly opened during the sync operation.
646        /// This only occurs after the metadata region has been successfully synced.
647        new_opened_logical_region_ids: Vec<RegionId>,
648    },
649}
650
651impl SyncManifestResponse {
652    /// Returns true if data region is synced.
653    pub fn is_data_synced(&self) -> bool {
654        match self {
655            SyncManifestResponse::NotSupported => false,
656            SyncManifestResponse::Mito { synced } => *synced,
657            SyncManifestResponse::Metric { data_synced, .. } => *data_synced,
658        }
659    }
660
661    /// Returns true if the engine is supported the sync operation.
662    pub fn is_supported(&self) -> bool {
663        matches!(self, SyncManifestResponse::NotSupported)
664    }
665
666    /// Returns true if the engine is a mito2 engine.
667    pub fn is_mito(&self) -> bool {
668        matches!(self, SyncManifestResponse::Mito { .. })
669    }
670
671    /// Returns true if the engine is a metric engine.
672    pub fn is_metric(&self) -> bool {
673        matches!(self, SyncManifestResponse::Metric { .. })
674    }
675
676    /// Returns the new opened logical region ids.
677    pub fn new_opened_logical_region_ids(self) -> Option<Vec<RegionId>> {
678        match self {
679            SyncManifestResponse::Metric {
680                new_opened_logical_region_ids,
681                ..
682            } => Some(new_opened_logical_region_ids),
683            _ => None,
684        }
685    }
686}
687
688#[async_trait]
689pub trait RegionEngine: Send + Sync {
690    /// Name of this engine
691    fn name(&self) -> &str;
692
693    /// Handles batch open region requests.
694    async fn handle_batch_open_requests(
695        &self,
696        parallelism: usize,
697        requests: Vec<(RegionId, RegionOpenRequest)>,
698    ) -> Result<BatchResponses, BoxedError> {
699        let semaphore = Arc::new(Semaphore::new(parallelism));
700        let mut tasks = Vec::with_capacity(requests.len());
701
702        for (region_id, request) in requests {
703            let semaphore_moved = semaphore.clone();
704
705            tasks.push(async move {
706                // Safety: semaphore must exist
707                let _permit = semaphore_moved.acquire().await.unwrap();
708                let result = self
709                    .handle_request(region_id, RegionRequest::Open(request))
710                    .await;
711                (region_id, result)
712            });
713        }
714
715        Ok(join_all(tasks).await)
716    }
717
718    async fn handle_batch_ddl_requests(
719        &self,
720        request: BatchRegionDdlRequest,
721    ) -> Result<RegionResponse, BoxedError> {
722        let requests = request.into_region_requests();
723
724        let mut affected_rows = 0;
725        let mut extensions = HashMap::new();
726
727        for (region_id, request) in requests {
728            let result = self.handle_request(region_id, request).await?;
729            affected_rows += result.affected_rows;
730            extensions.extend(result.extensions);
731        }
732
733        Ok(RegionResponse {
734            affected_rows,
735            extensions,
736            metadata: Vec::new(),
737        })
738    }
739
740    /// Handles non-query request to the region. Returns the count of affected rows.
741    async fn handle_request(
742        &self,
743        region_id: RegionId,
744        request: RegionRequest,
745    ) -> Result<RegionResponse, BoxedError>;
746
747    /// Returns the committed sequence (sequence of latest written data).
748    async fn get_committed_sequence(
749        &self,
750        region_id: RegionId,
751    ) -> Result<SequenceNumber, BoxedError>;
752
753    /// Handles query and return a scanner that can be used to scan the region concurrently.
754    async fn handle_query(
755        &self,
756        region_id: RegionId,
757        request: ScanRequest,
758    ) -> Result<RegionScannerRef, BoxedError>;
759
760    /// Retrieves region's metadata.
761    async fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef, BoxedError>;
762
763    /// Retrieves region's statistic.
764    fn region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic>;
765
766    /// Stops the engine
767    async fn stop(&self) -> Result<(), BoxedError>;
768
769    /// Sets [RegionRole] for a region.
770    ///
771    /// The engine checks whether the region is writable before writing to the region. Setting
772    /// the region as readonly doesn't guarantee that write operations in progress will not
773    /// take effect.
774    fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<(), BoxedError>;
775
776    /// Syncs the region manifest to the given manifest version.
777    async fn sync_region(
778        &self,
779        region_id: RegionId,
780        manifest_info: RegionManifestInfo,
781    ) -> Result<SyncManifestResponse, BoxedError>;
782
783    /// Sets region role state gracefully.
784    ///
785    /// After the call returns, the engine ensures no more write operations will succeed in the region.
786    async fn set_region_role_state_gracefully(
787        &self,
788        region_id: RegionId,
789        region_role_state: SettableRegionRoleState,
790    ) -> Result<SetRegionRoleStateResponse, BoxedError>;
791
792    /// Indicates region role.
793    ///
794    /// Returns the `None` if the region is not found.
795    fn role(&self, region_id: RegionId) -> Option<RegionRole>;
796
797    fn as_any(&self) -> &dyn Any;
798}
799
800pub type RegionEngineRef = Arc<dyn RegionEngine>;
801
802/// A [RegionScanner] that only scans a single partition.
803pub struct SinglePartitionScanner {
804    stream: Mutex<Option<SendableRecordBatchStream>>,
805    schema: SchemaRef,
806    properties: ScannerProperties,
807    metadata: RegionMetadataRef,
808}
809
810impl SinglePartitionScanner {
811    /// Creates a new [SinglePartitionScanner] with the given stream and metadata.
812    pub fn new(
813        stream: SendableRecordBatchStream,
814        append_mode: bool,
815        metadata: RegionMetadataRef,
816    ) -> Self {
817        let schema = stream.schema();
818        Self {
819            stream: Mutex::new(Some(stream)),
820            schema,
821            properties: ScannerProperties::default().with_append_mode(append_mode),
822            metadata,
823        }
824    }
825}
826
827impl Debug for SinglePartitionScanner {
828    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
829        write!(f, "SinglePartitionScanner: <SendableRecordBatchStream>")
830    }
831}
832
833impl RegionScanner for SinglePartitionScanner {
834    fn properties(&self) -> &ScannerProperties {
835        &self.properties
836    }
837
838    fn schema(&self) -> SchemaRef {
839        self.schema.clone()
840    }
841
842    fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
843        self.properties.prepare(request);
844        Ok(())
845    }
846
847    fn scan_partition(
848        &self,
849        _ctx: &QueryScanContext,
850        _metrics_set: &ExecutionPlanMetricsSet,
851        _partition: usize,
852    ) -> Result<SendableRecordBatchStream, BoxedError> {
853        let mut stream = self.stream.lock().unwrap();
854        let result = stream
855            .take()
856            .or_else(|| Some(Box::pin(EmptyRecordBatchStream::new(self.schema.clone()))));
857        Ok(result.unwrap())
858    }
859
860    fn has_predicate(&self) -> bool {
861        false
862    }
863
864    fn metadata(&self) -> RegionMetadataRef {
865        self.metadata.clone()
866    }
867
868    fn set_logical_region(&mut self, logical_region: bool) {
869        self.properties.set_logical_region(logical_region);
870    }
871}
872
873impl DisplayAs for SinglePartitionScanner {
874    fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
875        write!(f, "{:?}", self)
876    }
877}