meta_srv/procedure/repartition/
group.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub(crate) mod apply_staging_manifest;
16pub(crate) mod enter_staging_region;
17pub(crate) mod remap_manifest;
18pub(crate) mod repartition_end;
19pub(crate) mod repartition_start;
20pub(crate) mod sync_region;
21pub(crate) mod update_metadata;
22pub(crate) mod utils;
23
24use std::any::Any;
25use std::collections::HashMap;
26use std::fmt::{Debug, Display};
27use std::time::{Duration, Instant};
28
29use common_error::ext::BoxedError;
30use common_meta::cache_invalidator::CacheInvalidatorRef;
31use common_meta::ddl::DdlContext;
32use common_meta::instruction::CacheIdent;
33use common_meta::key::datanode_table::{DatanodeTableValue, RegionInfo};
34use common_meta::key::table_route::TableRouteValue;
35use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
36use common_meta::lock_key::{CatalogLock, RegionLock, SchemaLock};
37use common_meta::peer::Peer;
38use common_meta::rpc::router::RegionRoute;
39use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
40use common_procedure::{
41    Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
42    Result as ProcedureResult, Status, StringKey, UserMetadata,
43};
44use common_telemetry::{error, info};
45use serde::{Deserialize, Serialize};
46use snafu::{OptionExt, ResultExt};
47use store_api::storage::{RegionId, TableId};
48use uuid::Uuid;
49
50use crate::error::{self, Result};
51use crate::procedure::repartition::group::repartition_start::RepartitionStart;
52use crate::procedure::repartition::plan::RegionDescriptor;
53use crate::procedure::repartition::utils::get_datanode_table_value;
54use crate::procedure::repartition::{self};
55use crate::service::mailbox::MailboxRef;
56
57#[derive(Debug, Clone, Default)]
58pub struct Metrics {
59    /// Elapsed time of flushing pending deallocate regions.
60    flush_pending_deallocate_regions_elapsed: Duration,
61    /// Elapsed time of entering staging region.
62    enter_staging_region_elapsed: Duration,
63    /// Elapsed time of applying staging manifest.
64    apply_staging_manifest_elapsed: Duration,
65    /// Elapsed time of remapping manifest.
66    remap_manifest_elapsed: Duration,
67    /// Elapsed time of updating metadata.
68    update_metadata_elapsed: Duration,
69}
70
71impl Display for Metrics {
72    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73        let total = self.flush_pending_deallocate_regions_elapsed
74            + self.enter_staging_region_elapsed
75            + self.apply_staging_manifest_elapsed
76            + self.remap_manifest_elapsed
77            + self.update_metadata_elapsed;
78        write!(f, "total: {:?}", total)?;
79        let mut parts = Vec::with_capacity(5);
80        if self.flush_pending_deallocate_regions_elapsed > Duration::ZERO {
81            parts.push(format!(
82                "flush_pending_deallocate_regions_elapsed: {:?}",
83                self.flush_pending_deallocate_regions_elapsed
84            ));
85        }
86        if self.enter_staging_region_elapsed > Duration::ZERO {
87            parts.push(format!(
88                "enter_staging_region_elapsed: {:?}",
89                self.enter_staging_region_elapsed
90            ));
91        }
92        if self.apply_staging_manifest_elapsed > Duration::ZERO {
93            parts.push(format!(
94                "apply_staging_manifest_elapsed: {:?}",
95                self.apply_staging_manifest_elapsed
96            ));
97        }
98        if self.remap_manifest_elapsed > Duration::ZERO {
99            parts.push(format!(
100                "remap_manifest_elapsed: {:?}",
101                self.remap_manifest_elapsed
102            ));
103        }
104        if self.update_metadata_elapsed > Duration::ZERO {
105            parts.push(format!(
106                "update_metadata_elapsed: {:?}",
107                self.update_metadata_elapsed
108            ));
109        }
110
111        if !parts.is_empty() {
112            write!(f, ", {}", parts.join(", "))?;
113        }
114        Ok(())
115    }
116}
117
118impl Metrics {
119    /// Updates the elapsed time of entering staging region.
120    pub fn update_enter_staging_region_elapsed(&mut self, elapsed: Duration) {
121        self.enter_staging_region_elapsed += elapsed;
122    }
123
124    pub fn update_flush_pending_deallocate_regions_elapsed(&mut self, elapsed: Duration) {
125        self.flush_pending_deallocate_regions_elapsed += elapsed;
126    }
127
128    /// Updates the elapsed time of applying staging manifest.
129    pub fn update_apply_staging_manifest_elapsed(&mut self, elapsed: Duration) {
130        self.apply_staging_manifest_elapsed += elapsed;
131    }
132
133    /// Updates the elapsed time of remapping manifest.
134    pub fn update_remap_manifest_elapsed(&mut self, elapsed: Duration) {
135        self.remap_manifest_elapsed += elapsed;
136    }
137
138    /// Updates the elapsed time of updating metadata.
139    pub fn update_update_metadata_elapsed(&mut self, elapsed: Duration) {
140        self.update_metadata_elapsed += elapsed;
141    }
142}
143
144pub type GroupId = Uuid;
145
146pub struct RepartitionGroupProcedure {
147    state: Box<dyn State>,
148    context: Context,
149}
150
151#[derive(Debug, Serialize)]
152struct RepartitionGroupData<'a> {
153    persistent_ctx: &'a PersistentContext,
154    state: &'a dyn State,
155}
156
157#[derive(Debug, Deserialize)]
158struct RepartitionGroupDataOwned {
159    persistent_ctx: PersistentContext,
160    state: Box<dyn State>,
161}
162
163impl RepartitionGroupProcedure {
164    pub(crate) const TYPE_NAME: &'static str = "metasrv-procedure::RepartitionGroup";
165
166    pub fn new(persistent_context: PersistentContext, context: &repartition::Context) -> Self {
167        let state = Box::new(RepartitionStart);
168
169        Self {
170            state,
171            context: Context {
172                persistent_ctx: persistent_context,
173                cache_invalidator: context.cache_invalidator.clone(),
174                table_metadata_manager: context.table_metadata_manager.clone(),
175                mailbox: context.mailbox.clone(),
176                server_addr: context.server_addr.clone(),
177                start_time: Instant::now(),
178                volatile_ctx: VolatileContext::default(),
179            },
180        }
181    }
182
183    pub fn from_json<F>(json: &str, ctx_factory: F) -> ProcedureResult<Self>
184    where
185        F: FnOnce(PersistentContext) -> Context,
186    {
187        let RepartitionGroupDataOwned {
188            state,
189            persistent_ctx,
190        } = serde_json::from_str(json).context(FromJsonSnafu)?;
191        let context = ctx_factory(persistent_ctx);
192
193        Ok(Self { state, context })
194    }
195}
196
197#[async_trait::async_trait]
198impl Procedure for RepartitionGroupProcedure {
199    fn type_name(&self) -> &str {
200        Self::TYPE_NAME
201    }
202
203    async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
204        let state = &mut self.state;
205        let state_name = state.name();
206        // Log state transition
207        common_telemetry::info!(
208            "Repartition group procedure executing state: {}, group id: {}, table id: {}",
209            state_name,
210            self.context.persistent_ctx.group_id,
211            self.context.persistent_ctx.table_id
212        );
213
214        match state.next(&mut self.context, _ctx).await {
215            Ok((next, status)) => {
216                *state = next;
217                Ok(status)
218            }
219            Err(e) => {
220                if e.is_retryable() {
221                    Err(ProcedureError::retry_later(e))
222                } else {
223                    error!(
224                        e;
225                        "Repartition group procedure failed, group id: {}, table id: {}",
226                        self.context.persistent_ctx.group_id,
227                        self.context.persistent_ctx.table_id,
228                    );
229                    Err(ProcedureError::external(e))
230                }
231            }
232        }
233    }
234
235    fn rollback_supported(&self) -> bool {
236        false
237    }
238
239    fn dump(&self) -> ProcedureResult<String> {
240        let data = RepartitionGroupData {
241            persistent_ctx: &self.context.persistent_ctx,
242            state: self.state.as_ref(),
243        };
244        serde_json::to_string(&data).context(ToJsonSnafu)
245    }
246
247    fn lock_key(&self) -> LockKey {
248        LockKey::new(self.context.persistent_ctx.lock_key())
249    }
250
251    fn user_metadata(&self) -> Option<UserMetadata> {
252        // TODO(weny): support user metadata.
253        None
254    }
255}
256
257pub struct Context {
258    pub persistent_ctx: PersistentContext,
259
260    pub cache_invalidator: CacheInvalidatorRef,
261
262    pub table_metadata_manager: TableMetadataManagerRef,
263
264    pub mailbox: MailboxRef,
265
266    pub server_addr: String,
267
268    pub start_time: Instant,
269
270    pub volatile_ctx: VolatileContext,
271}
272
273#[derive(Debug, Clone, Default)]
274pub struct VolatileContext {
275    pub metrics: Metrics,
276}
277
278impl Context {
279    pub fn new(
280        ddl_ctx: &DdlContext,
281        mailbox: MailboxRef,
282        server_addr: String,
283        persistent_ctx: PersistentContext,
284    ) -> Self {
285        Self {
286            persistent_ctx,
287            cache_invalidator: ddl_ctx.cache_invalidator.clone(),
288            table_metadata_manager: ddl_ctx.table_metadata_manager.clone(),
289            mailbox,
290            server_addr,
291            start_time: Instant::now(),
292            volatile_ctx: VolatileContext::default(),
293        }
294    }
295}
296
297/// The result of the group preparation phase, containing validated region routes.
298#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
299pub struct GroupPrepareResult {
300    /// The validated source region routes.
301    pub source_routes: Vec<RegionRoute>,
302    /// The validated target region routes.
303    pub target_routes: Vec<RegionRoute>,
304    /// The primary source region id (first source region), used for retrieving region options.
305    pub central_region: RegionId,
306    /// The peer where the primary source region is located.
307    pub central_region_datanode: Peer,
308}
309
310#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
311pub struct PersistentContext {
312    pub group_id: GroupId,
313    /// The table id of the repartition group.
314    pub table_id: TableId,
315    /// The catalog name of the repartition group.
316    pub catalog_name: String,
317    /// The schema name of the repartition group.
318    pub schema_name: String,
319    /// The source regions of the repartition group.
320    pub sources: Vec<RegionDescriptor>,
321    /// The target regions of the repartition group.
322    pub targets: Vec<RegionDescriptor>,
323    /// For each `source region`, the corresponding
324    /// `target regions` that overlap with it.
325    pub region_mapping: HashMap<RegionId, Vec<RegionId>>,
326    /// The result of group prepare.
327    /// The value will be set in [RepartitionStart](crate::procedure::repartition::group::repartition_start::RepartitionStart) state.
328    pub group_prepare_result: Option<GroupPrepareResult>,
329    /// The staging manifest paths of the repartition group.
330    /// The value will be set in [RemapManifest](crate::procedure::repartition::group::remap_manifest::RemapManifest) state.
331    pub staging_manifest_paths: HashMap<RegionId, String>,
332    /// Whether sync region is needed for this group.
333    pub sync_region: bool,
334    /// The region ids of the newly allocated regions.
335    pub allocated_region_ids: Vec<RegionId>,
336    /// The region ids of the regions that are pending deallocation.
337    pub pending_deallocate_region_ids: Vec<RegionId>,
338    /// The timeout for repartition operations.
339    #[serde(with = "humantime_serde")]
340    pub timeout: Duration,
341}
342
343impl PersistentContext {
344    #[allow(clippy::too_many_arguments)]
345    pub fn new(
346        group_id: GroupId,
347        table_id: TableId,
348        catalog_name: String,
349        schema_name: String,
350        sources: Vec<RegionDescriptor>,
351        targets: Vec<RegionDescriptor>,
352        region_mapping: HashMap<RegionId, Vec<RegionId>>,
353        sync_region: bool,
354        allocated_region_ids: Vec<RegionId>,
355        pending_deallocate_region_ids: Vec<RegionId>,
356        timeout: Duration,
357    ) -> Self {
358        Self {
359            group_id,
360            table_id,
361            catalog_name,
362            schema_name,
363            sources,
364            targets,
365            region_mapping,
366            group_prepare_result: None,
367            staging_manifest_paths: HashMap::new(),
368            sync_region,
369            allocated_region_ids,
370            pending_deallocate_region_ids,
371            timeout,
372        }
373    }
374
375    pub fn lock_key(&self) -> Vec<StringKey> {
376        let mut lock_keys = Vec::with_capacity(2 + self.sources.len());
377        lock_keys.extend([
378            CatalogLock::Read(&self.catalog_name).into(),
379            SchemaLock::read(&self.catalog_name, &self.schema_name).into(),
380        ]);
381        for source in &self.sources {
382            lock_keys.push(RegionLock::Write(source.region_id).into());
383        }
384        lock_keys
385    }
386}
387
388impl Context {
389    /// Retrieves the table route value for the given table id.
390    ///
391    /// Retry:
392    /// - Failed to retrieve the metadata of table.
393    ///
394    /// Abort:
395    /// - Table route not found.
396    pub async fn get_table_route_value(
397        &self,
398    ) -> Result<DeserializedValueWithBytes<TableRouteValue>> {
399        let table_id = self.persistent_ctx.table_id;
400        let group_id = self.persistent_ctx.group_id;
401        let table_route_value = self
402            .table_metadata_manager
403            .table_route_manager()
404            .table_route_storage()
405            .get_with_raw_bytes(table_id)
406            .await
407            .map_err(BoxedError::new)
408            .with_context(|_| error::RetryLaterWithSourceSnafu {
409                reason: format!(
410                    "Failed to get table route for table: {}, repartition group: {}",
411                    table_id, group_id
412                ),
413            })?
414            .context(error::TableRouteNotFoundSnafu { table_id })?;
415
416        Ok(table_route_value)
417    }
418
419    /// Returns the `datanode_table_value`
420    ///
421    /// Retry:
422    /// - Failed to retrieve the metadata of datanode table.
423    pub async fn get_datanode_table_value(
424        &self,
425        table_id: TableId,
426        datanode_id: u64,
427    ) -> Result<DatanodeTableValue> {
428        get_datanode_table_value(&self.table_metadata_manager, table_id, datanode_id).await
429    }
430
431    /// Broadcasts the invalidate table cache message.
432    pub async fn invalidate_table_cache(&self) -> Result<()> {
433        let table_id = self.persistent_ctx.table_id;
434        let group_id = self.persistent_ctx.group_id;
435        let subject = format!(
436            "Invalidate table cache for repartition table, group: {}, table: {}",
437            group_id, table_id,
438        );
439        let ctx = common_meta::cache_invalidator::Context {
440            subject: Some(subject),
441        };
442        let _ = self
443            .cache_invalidator
444            .invalidate(&ctx, &[CacheIdent::TableId(table_id)])
445            .await;
446        Ok(())
447    }
448
449    /// Updates the table route.
450    ///
451    /// Retry:
452    /// - Failed to retrieve the metadata of datanode table.
453    ///
454    /// Abort:
455    /// - Table route not found.
456    /// - Failed to update the table route.
457    pub async fn update_table_route(
458        &self,
459        current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
460        new_region_routes: Vec<RegionRoute>,
461    ) -> Result<()> {
462        let table_id = self.persistent_ctx.table_id;
463        let group_id = self.persistent_ctx.group_id;
464        // Safety: prepare result is set in [RepartitionStart] state.
465        let prepare_result = self.persistent_ctx.group_prepare_result.as_ref().unwrap();
466        let central_region_datanode_table_value = self
467            .get_datanode_table_value(table_id, prepare_result.central_region_datanode.id)
468            .await?;
469        let RegionInfo {
470            region_options,
471            region_wal_options,
472            ..
473        } = &central_region_datanode_table_value.region_info;
474
475        info!(
476            "Updating table route for table: {}, group_id: {}, new region routes: {:?}",
477            table_id, group_id, new_region_routes
478        );
479        self.table_metadata_manager
480            .update_table_route(
481                table_id,
482                central_region_datanode_table_value.region_info.clone(),
483                current_table_route_value,
484                new_region_routes,
485                region_options,
486                region_wal_options,
487            )
488            .await
489            .context(error::TableMetadataManagerSnafu)
490    }
491
492    /// Updates the table repart mapping.
493    pub async fn update_table_repart_mapping(&self) -> Result<()> {
494        info!(
495            "Updating table repart mapping for table: {}, group_id: {}, region mapping: {:?}",
496            self.persistent_ctx.table_id,
497            self.persistent_ctx.group_id,
498            self.persistent_ctx.region_mapping
499        );
500
501        self.table_metadata_manager
502            .table_repart_manager()
503            .update_mappings(
504                self.persistent_ctx.table_id,
505                &self.persistent_ctx.region_mapping,
506            )
507            .await
508            .context(error::TableMetadataManagerSnafu)
509    }
510
511    /// Returns the next operation timeout.
512    ///
513    /// If the next operation timeout is not set, it will return `None`.
514    pub fn next_operation_timeout(&self) -> Option<Duration> {
515        self.persistent_ctx
516            .timeout
517            .checked_sub(self.start_time.elapsed())
518    }
519
520    /// Updates the elapsed time of entering staging region.
521    pub fn update_enter_staging_region_elapsed(&mut self, elapsed: Duration) {
522        self.volatile_ctx
523            .metrics
524            .update_enter_staging_region_elapsed(elapsed);
525    }
526
527    /// Updates the elapsed time of flushing pending deallocate regions.
528    pub fn update_flush_pending_deallocate_regions_elapsed(&mut self, elapsed: Duration) {
529        self.volatile_ctx
530            .metrics
531            .update_flush_pending_deallocate_regions_elapsed(elapsed);
532    }
533
534    /// Updates the elapsed time of applying staging manifest.
535    pub fn update_apply_staging_manifest_elapsed(&mut self, elapsed: Duration) {
536        self.volatile_ctx
537            .metrics
538            .update_apply_staging_manifest_elapsed(elapsed);
539    }
540
541    /// Updates the elapsed time of remapping manifest.
542    pub fn update_remap_manifest_elapsed(&mut self, elapsed: Duration) {
543        self.volatile_ctx
544            .metrics
545            .update_remap_manifest_elapsed(elapsed);
546    }
547
548    /// Updates the elapsed time of updating metadata.
549    pub fn update_update_metadata_elapsed(&mut self, elapsed: Duration) {
550        self.volatile_ctx
551            .metrics
552            .update_update_metadata_elapsed(elapsed);
553    }
554}
555
556/// Returns the region routes of the given table route value.
557///
558/// Abort:
559/// - Table route value is not physical.
560pub fn region_routes(
561    table_id: TableId,
562    table_route_value: &TableRouteValue,
563) -> Result<&Vec<RegionRoute>> {
564    table_route_value
565        .region_routes()
566        .with_context(|_| error::UnexpectedLogicalRouteTableSnafu {
567            err_msg: format!(
568                "TableRoute({:?}) is a non-physical TableRouteValue.",
569                table_id
570            ),
571        })
572}
573
574#[async_trait::async_trait]
575#[typetag::serde(tag = "repartition_group_state")]
576pub(crate) trait State: Sync + Send + Debug {
577    fn name(&self) -> &'static str {
578        let type_name = std::any::type_name::<Self>();
579        // short name
580        type_name.split("::").last().unwrap_or(type_name)
581    }
582
583    /// Yields the next [State] and [Status].
584    async fn next(
585        &mut self,
586        ctx: &mut Context,
587        procedure_ctx: &ProcedureContext,
588    ) -> Result<(Box<dyn State>, Status)>;
589
590    fn as_any(&self) -> &dyn Any;
591}
592
593#[cfg(test)]
594mod tests {
595    use std::assert_matches::assert_matches;
596    use std::sync::Arc;
597
598    use common_meta::key::TableMetadataManager;
599    use common_meta::kv_backend::test_util::MockKvBackendBuilder;
600
601    use crate::error::Error;
602    use crate::procedure::repartition::test_util::{TestingEnv, new_persistent_context};
603
604    #[tokio::test]
605    async fn test_get_table_route_value_not_found_error() {
606        let env = TestingEnv::new();
607        let persistent_context = new_persistent_context(1024, vec![], vec![]);
608        let ctx = env.create_context(persistent_context);
609        let err = ctx.get_table_route_value().await.unwrap_err();
610        assert_matches!(err, Error::TableRouteNotFound { .. });
611        assert!(!err.is_retryable());
612    }
613
614    #[tokio::test]
615    async fn test_get_table_route_value_retry_error() {
616        let kv = MockKvBackendBuilder::default()
617            .range_fn(Arc::new(|_| {
618                common_meta::error::UnexpectedSnafu {
619                    err_msg: "mock err",
620                }
621                .fail()
622            }))
623            .build()
624            .unwrap();
625        let mut env = TestingEnv::new();
626        env.table_metadata_manager = Arc::new(TableMetadataManager::new(Arc::new(kv)));
627        let persistent_context = new_persistent_context(1024, vec![], vec![]);
628        let ctx = env.create_context(persistent_context);
629        let err = ctx.get_table_route_value().await.unwrap_err();
630        assert!(err.is_retryable());
631    }
632
633    #[tokio::test]
634    async fn test_get_datanode_table_value_retry_error() {
635        let kv = MockKvBackendBuilder::default()
636            .range_fn(Arc::new(|_| {
637                common_meta::error::UnexpectedSnafu {
638                    err_msg: "mock err",
639                }
640                .fail()
641            }))
642            .build()
643            .unwrap();
644        let mut env = TestingEnv::new();
645        env.table_metadata_manager = Arc::new(TableMetadataManager::new(Arc::new(kv)));
646        let persistent_context = new_persistent_context(1024, vec![], vec![]);
647        let ctx = env.create_context(persistent_context);
648        let err = ctx.get_datanode_table_value(1024, 1).await.unwrap_err();
649        assert!(err.is_retryable());
650    }
651}