Skip to main content

meta_srv/procedure/repartition/
group.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub(crate) mod apply_staging_manifest;
16pub(crate) mod enter_staging_region;
17pub(crate) mod remap_manifest;
18pub(crate) mod repartition_end;
19pub(crate) mod repartition_start;
20pub(crate) mod sync_region;
21pub(crate) mod update_metadata;
22pub(crate) mod utils;
23
24use std::any::Any;
25use std::collections::HashMap;
26use std::fmt::{Debug, Display};
27use std::time::{Duration, Instant};
28
29use common_error::ext::BoxedError;
30use common_meta::cache_invalidator::CacheInvalidatorRef;
31use common_meta::ddl::DdlContext;
32use common_meta::instruction::CacheIdent;
33use common_meta::key::datanode_table::{DatanodeTableValue, RegionInfo};
34use common_meta::key::table_route::TableRouteValue;
35use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
36use common_meta::lock_key::{CatalogLock, RegionLock, SchemaLock};
37use common_meta::peer::Peer;
38use common_meta::rpc::router::RegionRoute;
39use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
40use common_procedure::{
41    Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
42    Result as ProcedureResult, Status, StringKey, UserMetadata,
43};
44use common_telemetry::{error, info};
45use serde::{Deserialize, Serialize};
46use snafu::{OptionExt, ResultExt};
47use store_api::storage::{RegionId, TableId};
48use uuid::Uuid;
49
50use crate::error::{self, Result};
51use crate::procedure::repartition::group::repartition_start::RepartitionStart;
52use crate::procedure::repartition::plan::RegionDescriptor;
53use crate::procedure::repartition::utils::get_datanode_table_value;
54use crate::procedure::repartition::{self};
55use crate::service::mailbox::MailboxRef;
56
57#[derive(Debug, Clone, Default)]
58pub struct Metrics {
59    /// Elapsed time of flushing pending deallocate regions.
60    flush_pending_deallocate_regions_elapsed: Duration,
61    /// Elapsed time of entering staging region.
62    enter_staging_region_elapsed: Duration,
63    /// Elapsed time of applying staging manifest.
64    apply_staging_manifest_elapsed: Duration,
65    /// Elapsed time of remapping manifest.
66    remap_manifest_elapsed: Duration,
67    /// Elapsed time of updating metadata.
68    update_metadata_elapsed: Duration,
69}
70
71impl Display for Metrics {
72    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73        let total = self.flush_pending_deallocate_regions_elapsed
74            + self.enter_staging_region_elapsed
75            + self.apply_staging_manifest_elapsed
76            + self.remap_manifest_elapsed
77            + self.update_metadata_elapsed;
78        write!(f, "total: {:?}", total)?;
79        let mut parts = Vec::with_capacity(5);
80        if self.flush_pending_deallocate_regions_elapsed > Duration::ZERO {
81            parts.push(format!(
82                "flush_pending_deallocate_regions_elapsed: {:?}",
83                self.flush_pending_deallocate_regions_elapsed
84            ));
85        }
86        if self.enter_staging_region_elapsed > Duration::ZERO {
87            parts.push(format!(
88                "enter_staging_region_elapsed: {:?}",
89                self.enter_staging_region_elapsed
90            ));
91        }
92        if self.apply_staging_manifest_elapsed > Duration::ZERO {
93            parts.push(format!(
94                "apply_staging_manifest_elapsed: {:?}",
95                self.apply_staging_manifest_elapsed
96            ));
97        }
98        if self.remap_manifest_elapsed > Duration::ZERO {
99            parts.push(format!(
100                "remap_manifest_elapsed: {:?}",
101                self.remap_manifest_elapsed
102            ));
103        }
104        if self.update_metadata_elapsed > Duration::ZERO {
105            parts.push(format!(
106                "update_metadata_elapsed: {:?}",
107                self.update_metadata_elapsed
108            ));
109        }
110
111        if !parts.is_empty() {
112            write!(f, ", {}", parts.join(", "))?;
113        }
114        Ok(())
115    }
116}
117
118impl Metrics {
119    /// Updates the elapsed time of entering staging region.
120    pub fn update_enter_staging_region_elapsed(&mut self, elapsed: Duration) {
121        self.enter_staging_region_elapsed += elapsed;
122    }
123
124    pub fn update_flush_pending_deallocate_regions_elapsed(&mut self, elapsed: Duration) {
125        self.flush_pending_deallocate_regions_elapsed += elapsed;
126    }
127
128    /// Updates the elapsed time of applying staging manifest.
129    pub fn update_apply_staging_manifest_elapsed(&mut self, elapsed: Duration) {
130        self.apply_staging_manifest_elapsed += elapsed;
131    }
132
133    /// Updates the elapsed time of remapping manifest.
134    pub fn update_remap_manifest_elapsed(&mut self, elapsed: Duration) {
135        self.remap_manifest_elapsed += elapsed;
136    }
137
138    /// Updates the elapsed time of updating metadata.
139    pub fn update_update_metadata_elapsed(&mut self, elapsed: Duration) {
140        self.update_metadata_elapsed += elapsed;
141    }
142}
143
144pub type GroupId = Uuid;
145
146pub struct RepartitionGroupProcedure {
147    state: Box<dyn State>,
148    context: Context,
149}
150
151#[derive(Debug, Serialize)]
152struct RepartitionGroupData<'a> {
153    persistent_ctx: &'a PersistentContext,
154    state: &'a dyn State,
155}
156
157#[derive(Debug, Deserialize)]
158struct RepartitionGroupDataOwned {
159    persistent_ctx: PersistentContext,
160    state: Box<dyn State>,
161}
162
163impl RepartitionGroupProcedure {
164    pub(crate) const TYPE_NAME: &'static str = "metasrv-procedure::RepartitionGroup";
165
166    pub fn new(persistent_context: PersistentContext, context: &repartition::Context) -> Self {
167        let state = Box::new(RepartitionStart);
168
169        Self {
170            state,
171            context: Context {
172                persistent_ctx: persistent_context,
173                cache_invalidator: context.cache_invalidator.clone(),
174                table_metadata_manager: context.table_metadata_manager.clone(),
175                mailbox: context.mailbox.clone(),
176                server_addr: context.server_addr.clone(),
177                start_time: Instant::now(),
178                volatile_ctx: VolatileContext::default(),
179            },
180        }
181    }
182
183    pub fn from_json<F>(json: &str, ctx_factory: F) -> ProcedureResult<Self>
184    where
185        F: FnOnce(PersistentContext) -> Context,
186    {
187        let RepartitionGroupDataOwned {
188            state,
189            persistent_ctx,
190        } = serde_json::from_str(json).context(FromJsonSnafu)?;
191        let context = ctx_factory(persistent_ctx);
192
193        Ok(Self { state, context })
194    }
195}
196
197#[async_trait::async_trait]
198impl Procedure for RepartitionGroupProcedure {
199    fn type_name(&self) -> &str {
200        Self::TYPE_NAME
201    }
202
203    async fn rollback(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<()> {
204        // The parent repartition procedure is responsible for rollback and recovery.
205        // Subprocedures are not recovered after metasrv restarts, so implementing rollback for them is meaningless.
206        Ok(())
207    }
208
209    #[tracing::instrument(skip_all, fields(
210        state = %self.state.name(),
211        table_id = self.context.persistent_ctx.table_id,
212        group_id = %self.context.persistent_ctx.group_id,
213    ))]
214    async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
215        let state = &mut self.state;
216        let state_name = state.name();
217        // Log state transition
218        common_telemetry::info!(
219            "Repartition group procedure executing state: {}, group id: {}, table id: {}",
220            state_name,
221            self.context.persistent_ctx.group_id,
222            self.context.persistent_ctx.table_id
223        );
224
225        match state.next(&mut self.context, _ctx).await {
226            Ok((next, status)) => {
227                *state = next;
228                Ok(status)
229            }
230            Err(e) => {
231                if e.is_retryable() {
232                    Err(ProcedureError::retry_later(e))
233                } else {
234                    error!(
235                        e;
236                        "Repartition group procedure failed, group id: {}, table id: {}",
237                        self.context.persistent_ctx.group_id,
238                        self.context.persistent_ctx.table_id,
239                    );
240                    Err(ProcedureError::external(e))
241                }
242            }
243        }
244    }
245
246    fn rollback_supported(&self) -> bool {
247        // Parent repartition owns rollback and recovery because subprocedures are
248        // not relied on as durable rollback units across metasrv restarts.
249        false
250    }
251
252    fn dump(&self) -> ProcedureResult<String> {
253        let data = RepartitionGroupData {
254            persistent_ctx: &self.context.persistent_ctx,
255            state: self.state.as_ref(),
256        };
257        serde_json::to_string(&data).context(ToJsonSnafu)
258    }
259
260    fn lock_key(&self) -> LockKey {
261        LockKey::new(self.context.persistent_ctx.lock_key())
262    }
263
264    fn user_metadata(&self) -> Option<UserMetadata> {
265        // TODO(weny): support user metadata.
266        None
267    }
268}
269
270pub struct Context {
271    pub persistent_ctx: PersistentContext,
272
273    pub cache_invalidator: CacheInvalidatorRef,
274
275    pub table_metadata_manager: TableMetadataManagerRef,
276
277    pub mailbox: MailboxRef,
278
279    pub server_addr: String,
280
281    pub start_time: Instant,
282
283    pub volatile_ctx: VolatileContext,
284}
285
286#[derive(Debug, Clone, Default)]
287pub struct VolatileContext {
288    pub metrics: Metrics,
289}
290
291impl Context {
292    pub fn new(
293        ddl_ctx: &DdlContext,
294        mailbox: MailboxRef,
295        server_addr: String,
296        persistent_ctx: PersistentContext,
297    ) -> Self {
298        Self {
299            persistent_ctx,
300            cache_invalidator: ddl_ctx.cache_invalidator.clone(),
301            table_metadata_manager: ddl_ctx.table_metadata_manager.clone(),
302            mailbox,
303            server_addr,
304            start_time: Instant::now(),
305            volatile_ctx: VolatileContext::default(),
306        }
307    }
308}
309
310/// The result of the group preparation phase, containing validated region routes.
311#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
312pub struct GroupPrepareResult {
313    /// The validated source region routes.
314    pub source_routes: Vec<RegionRoute>,
315    /// Validated target region routes used for metadata rollback (logical rollback).
316    pub target_routes: Vec<RegionRoute>,
317    /// The primary source region id (first source region), used for retrieving region options.
318    pub central_region: RegionId,
319    /// The peer where the primary source region is located.
320    pub central_region_datanode: Peer,
321}
322
323#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
324pub struct PersistentContext {
325    pub group_id: GroupId,
326    /// The table id of the repartition group.
327    pub table_id: TableId,
328    /// The catalog name of the repartition group.
329    pub catalog_name: String,
330    /// The schema name of the repartition group.
331    pub schema_name: String,
332    /// The source regions of the repartition group.
333    pub sources: Vec<RegionDescriptor>,
334    /// The target regions of the repartition group.
335    pub targets: Vec<RegionDescriptor>,
336    /// For each `source region`, the corresponding
337    /// `target regions` that overlap with it.
338    pub region_mapping: HashMap<RegionId, Vec<RegionId>>,
339    /// The result of group prepare.
340    /// The value will be set in [RepartitionStart](crate::procedure::repartition::group::repartition_start::RepartitionStart) state.
341    pub group_prepare_result: Option<GroupPrepareResult>,
342    /// The staging manifest paths of the repartition group.
343    /// The value will be set in [RemapManifest](crate::procedure::repartition::group::remap_manifest::RemapManifest) state.
344    pub staging_manifest_paths: HashMap<RegionId, String>,
345    /// Whether sync region is needed for this group.
346    pub sync_region: bool,
347    /// The region ids of the newly allocated regions.
348    pub allocated_region_ids: Vec<RegionId>,
349    /// The region ids of the regions that are pending deallocation.
350    pub pending_deallocate_region_ids: Vec<RegionId>,
351    /// The timeout for repartition operations.
352    #[serde(with = "humantime_serde")]
353    pub timeout: Duration,
354}
355
356impl PersistentContext {
357    #[allow(clippy::too_many_arguments)]
358    pub fn new(
359        group_id: GroupId,
360        table_id: TableId,
361        catalog_name: String,
362        schema_name: String,
363        sources: Vec<RegionDescriptor>,
364        targets: Vec<RegionDescriptor>,
365        region_mapping: HashMap<RegionId, Vec<RegionId>>,
366        sync_region: bool,
367        allocated_region_ids: Vec<RegionId>,
368        pending_deallocate_region_ids: Vec<RegionId>,
369        timeout: Duration,
370    ) -> Self {
371        Self {
372            group_id,
373            table_id,
374            catalog_name,
375            schema_name,
376            sources,
377            targets,
378            region_mapping,
379            group_prepare_result: None,
380            staging_manifest_paths: HashMap::new(),
381            sync_region,
382            allocated_region_ids,
383            pending_deallocate_region_ids,
384            timeout,
385        }
386    }
387
388    pub fn lock_key(&self) -> Vec<StringKey> {
389        let mut lock_keys = Vec::with_capacity(2 + self.sources.len());
390        lock_keys.extend([
391            CatalogLock::Read(&self.catalog_name).into(),
392            SchemaLock::read(&self.catalog_name, &self.schema_name).into(),
393        ]);
394        for source in &self.sources {
395            lock_keys.push(RegionLock::Write(source.region_id).into());
396        }
397        lock_keys
398    }
399}
400
401impl Context {
402    /// Retrieves the table route value for the given table id.
403    ///
404    /// Retry:
405    /// - Failed to retrieve the metadata of table.
406    ///
407    /// Abort:
408    /// - Table route not found.
409    pub async fn get_table_route_value(
410        &self,
411    ) -> Result<DeserializedValueWithBytes<TableRouteValue>> {
412        let table_id = self.persistent_ctx.table_id;
413        let group_id = self.persistent_ctx.group_id;
414        let table_route_value = self
415            .table_metadata_manager
416            .table_route_manager()
417            .table_route_storage()
418            .get_with_raw_bytes(table_id)
419            .await
420            .map_err(BoxedError::new)
421            .with_context(|_| error::RetryLaterWithSourceSnafu {
422                reason: format!(
423                    "Failed to get table route for table: {}, repartition group: {}",
424                    table_id, group_id
425                ),
426            })?
427            .context(error::TableRouteNotFoundSnafu { table_id })?;
428
429        Ok(table_route_value)
430    }
431
432    /// Returns the `datanode_table_value`
433    ///
434    /// Retry:
435    /// - Failed to retrieve the metadata of datanode table.
436    pub async fn get_datanode_table_value(
437        &self,
438        table_id: TableId,
439        datanode_id: u64,
440    ) -> Result<DatanodeTableValue> {
441        get_datanode_table_value(&self.table_metadata_manager, table_id, datanode_id).await
442    }
443
444    /// Broadcasts the invalidate table cache message.
445    pub async fn invalidate_table_cache(&self) -> Result<()> {
446        let table_id = self.persistent_ctx.table_id;
447        let group_id = self.persistent_ctx.group_id;
448        let subject = format!(
449            "Invalidate table cache for repartition table, group: {}, table: {}",
450            group_id, table_id,
451        );
452        let ctx = common_meta::cache_invalidator::Context {
453            subject: Some(subject),
454        };
455        let _ = self
456            .cache_invalidator
457            .invalidate(&ctx, &[CacheIdent::TableId(table_id)])
458            .await;
459        Ok(())
460    }
461
462    /// Updates the table route.
463    ///
464    /// Retry:
465    /// - Failed to retrieve the metadata of datanode table.
466    ///
467    /// Abort:
468    /// - Table route not found.
469    /// - Failed to update the table route.
470    pub async fn update_table_route(
471        &self,
472        current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
473        new_region_routes: Vec<RegionRoute>,
474    ) -> Result<()> {
475        let table_id = self.persistent_ctx.table_id;
476        let group_id = self.persistent_ctx.group_id;
477        // Safety: prepare result is set in [RepartitionStart] state.
478        let prepare_result = self.persistent_ctx.group_prepare_result.as_ref().unwrap();
479        let central_region_datanode_table_value = self
480            .get_datanode_table_value(table_id, prepare_result.central_region_datanode.id)
481            .await?;
482        let RegionInfo {
483            region_options,
484            region_wal_options,
485            ..
486        } = &central_region_datanode_table_value.region_info;
487
488        info!(
489            "Updating table route for table: {}, group_id: {}, new region routes: {:?}",
490            table_id, group_id, new_region_routes
491        );
492        self.table_metadata_manager
493            .update_table_route(
494                table_id,
495                central_region_datanode_table_value.region_info.clone(),
496                current_table_route_value,
497                new_region_routes,
498                region_options,
499                region_wal_options,
500            )
501            .await
502            .context(error::TableMetadataManagerSnafu)
503    }
504
505    /// Updates the table repart mapping.
506    pub async fn update_table_repart_mapping(&self) -> Result<()> {
507        info!(
508            "Updating table repart mapping for table: {}, group_id: {}, region mapping: {:?}",
509            self.persistent_ctx.table_id,
510            self.persistent_ctx.group_id,
511            self.persistent_ctx.region_mapping
512        );
513
514        self.table_metadata_manager
515            .table_repart_manager()
516            .update_mappings(
517                self.persistent_ctx.table_id,
518                &self.persistent_ctx.region_mapping,
519            )
520            .await
521            .context(error::TableMetadataManagerSnafu)
522    }
523
524    /// Returns the next operation timeout.
525    ///
526    /// If the next operation timeout is not set, it will return `None`.
527    pub fn next_operation_timeout(&self) -> Option<Duration> {
528        self.persistent_ctx
529            .timeout
530            .checked_sub(self.start_time.elapsed())
531    }
532
533    /// Updates the elapsed time of entering staging region.
534    pub fn update_enter_staging_region_elapsed(&mut self, elapsed: Duration) {
535        self.volatile_ctx
536            .metrics
537            .update_enter_staging_region_elapsed(elapsed);
538    }
539
540    /// Updates the elapsed time of flushing pending deallocate regions.
541    pub fn update_flush_pending_deallocate_regions_elapsed(&mut self, elapsed: Duration) {
542        self.volatile_ctx
543            .metrics
544            .update_flush_pending_deallocate_regions_elapsed(elapsed);
545    }
546
547    /// Updates the elapsed time of applying staging manifest.
548    pub fn update_apply_staging_manifest_elapsed(&mut self, elapsed: Duration) {
549        self.volatile_ctx
550            .metrics
551            .update_apply_staging_manifest_elapsed(elapsed);
552    }
553
554    /// Updates the elapsed time of remapping manifest.
555    pub fn update_remap_manifest_elapsed(&mut self, elapsed: Duration) {
556        self.volatile_ctx
557            .metrics
558            .update_remap_manifest_elapsed(elapsed);
559    }
560
561    /// Updates the elapsed time of updating metadata.
562    pub fn update_update_metadata_elapsed(&mut self, elapsed: Duration) {
563        self.volatile_ctx
564            .metrics
565            .update_update_metadata_elapsed(elapsed);
566    }
567}
568
569/// Returns the region routes of the given table route value.
570///
571/// Abort:
572/// - Table route value is not physical.
573pub fn region_routes(
574    table_id: TableId,
575    table_route_value: &TableRouteValue,
576) -> Result<&Vec<RegionRoute>> {
577    table_route_value
578        .region_routes()
579        .with_context(|_| error::UnexpectedLogicalRouteTableSnafu {
580            err_msg: format!(
581                "TableRoute({:?}) is a non-physical TableRouteValue.",
582                table_id
583            ),
584        })
585}
586
587#[async_trait::async_trait]
588#[typetag::serde(tag = "repartition_group_state")]
589pub(crate) trait State: Sync + Send + Debug {
590    fn name(&self) -> &'static str {
591        let type_name = std::any::type_name::<Self>();
592        // short name
593        type_name.split("::").last().unwrap_or(type_name)
594    }
595
596    /// Yields the next [State] and [Status].
597    async fn next(
598        &mut self,
599        ctx: &mut Context,
600        procedure_ctx: &ProcedureContext,
601    ) -> Result<(Box<dyn State>, Status)>;
602
603    fn as_any(&self) -> &dyn Any;
604}
605
606#[cfg(test)]
607mod tests {
608    use std::assert_matches;
609    use std::sync::Arc;
610
611    use common_meta::key::TableMetadataManager;
612    use common_meta::kv_backend::test_util::MockKvBackendBuilder;
613
614    use crate::error::Error;
615    use crate::procedure::repartition::test_util::{TestingEnv, new_persistent_context};
616
617    #[tokio::test]
618    async fn test_get_table_route_value_not_found_error() {
619        let env = TestingEnv::new();
620        let persistent_context = new_persistent_context(1024, vec![], vec![]);
621        let ctx = env.create_context(persistent_context);
622        let err = ctx.get_table_route_value().await.unwrap_err();
623        assert_matches!(err, Error::TableRouteNotFound { .. });
624        assert!(!err.is_retryable());
625    }
626
627    #[tokio::test]
628    async fn test_get_table_route_value_retry_error() {
629        let kv = MockKvBackendBuilder::default()
630            .range_fn(Arc::new(|_| {
631                common_meta::error::UnexpectedSnafu {
632                    err_msg: "mock err",
633                }
634                .fail()
635            }))
636            .build()
637            .unwrap();
638        let mut env = TestingEnv::new();
639        env.table_metadata_manager = Arc::new(TableMetadataManager::new(Arc::new(kv)));
640        let persistent_context = new_persistent_context(1024, vec![], vec![]);
641        let ctx = env.create_context(persistent_context);
642        let err = ctx.get_table_route_value().await.unwrap_err();
643        assert!(err.is_retryable());
644    }
645
646    #[tokio::test]
647    async fn test_get_datanode_table_value_retry_error() {
648        let kv = MockKvBackendBuilder::default()
649            .range_fn(Arc::new(|_| {
650                common_meta::error::UnexpectedSnafu {
651                    err_msg: "mock err",
652                }
653                .fail()
654            }))
655            .build()
656            .unwrap();
657        let mut env = TestingEnv::new();
658        env.table_metadata_manager = Arc::new(TableMetadataManager::new(Arc::new(kv)));
659        let persistent_context = new_persistent_context(1024, vec![], vec![]);
660        let ctx = env.create_context(persistent_context);
661        let err = ctx.get_datanode_table_value(1024, 1).await.unwrap_err();
662        assert!(err.is_retryable());
663    }
664}