meta_srv/procedure/repartition/
group.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub(crate) mod apply_staging_manifest;
16pub(crate) mod enter_staging_region;
17pub(crate) mod remap_manifest;
18pub(crate) mod repartition_end;
19pub(crate) mod repartition_start;
20pub(crate) mod sync_region;
21pub(crate) mod update_metadata;
22pub(crate) mod utils;
23
24use std::any::Any;
25use std::collections::HashMap;
26use std::fmt::{Debug, Display};
27use std::time::{Duration, Instant};
28
29use common_error::ext::BoxedError;
30use common_meta::cache_invalidator::CacheInvalidatorRef;
31use common_meta::ddl::DdlContext;
32use common_meta::instruction::CacheIdent;
33use common_meta::key::datanode_table::{DatanodeTableValue, RegionInfo};
34use common_meta::key::table_route::TableRouteValue;
35use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
36use common_meta::lock_key::{CatalogLock, RegionLock, SchemaLock};
37use common_meta::peer::Peer;
38use common_meta::rpc::router::RegionRoute;
39use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
40use common_procedure::{
41    Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
42    Result as ProcedureResult, Status, StringKey, UserMetadata,
43};
44use common_telemetry::{error, info};
45use serde::{Deserialize, Serialize};
46use snafu::{OptionExt, ResultExt};
47use store_api::storage::{RegionId, TableId};
48use uuid::Uuid;
49
50use crate::error::{self, Result};
51use crate::procedure::repartition::group::repartition_start::RepartitionStart;
52use crate::procedure::repartition::plan::RegionDescriptor;
53use crate::procedure::repartition::utils::get_datanode_table_value;
54use crate::procedure::repartition::{self};
55use crate::service::mailbox::MailboxRef;
56
57#[derive(Debug, Clone, Default)]
58pub struct Metrics {
59    /// Elapsed time of flushing pending deallocate regions.
60    flush_pending_deallocate_regions_elapsed: Duration,
61    /// Elapsed time of entering staging region.
62    enter_staging_region_elapsed: Duration,
63    /// Elapsed time of applying staging manifest.
64    apply_staging_manifest_elapsed: Duration,
65    /// Elapsed time of remapping manifest.
66    remap_manifest_elapsed: Duration,
67    /// Elapsed time of updating metadata.
68    update_metadata_elapsed: Duration,
69}
70
71impl Display for Metrics {
72    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73        let total = self.flush_pending_deallocate_regions_elapsed
74            + self.enter_staging_region_elapsed
75            + self.apply_staging_manifest_elapsed
76            + self.remap_manifest_elapsed
77            + self.update_metadata_elapsed;
78        write!(f, "total: {:?}", total)?;
79        let mut parts = Vec::with_capacity(5);
80        if self.flush_pending_deallocate_regions_elapsed > Duration::ZERO {
81            parts.push(format!(
82                "flush_pending_deallocate_regions_elapsed: {:?}",
83                self.flush_pending_deallocate_regions_elapsed
84            ));
85        }
86        if self.enter_staging_region_elapsed > Duration::ZERO {
87            parts.push(format!(
88                "enter_staging_region_elapsed: {:?}",
89                self.enter_staging_region_elapsed
90            ));
91        }
92        if self.apply_staging_manifest_elapsed > Duration::ZERO {
93            parts.push(format!(
94                "apply_staging_manifest_elapsed: {:?}",
95                self.apply_staging_manifest_elapsed
96            ));
97        }
98        if self.remap_manifest_elapsed > Duration::ZERO {
99            parts.push(format!(
100                "remap_manifest_elapsed: {:?}",
101                self.remap_manifest_elapsed
102            ));
103        }
104        if self.update_metadata_elapsed > Duration::ZERO {
105            parts.push(format!(
106                "update_metadata_elapsed: {:?}",
107                self.update_metadata_elapsed
108            ));
109        }
110
111        if !parts.is_empty() {
112            write!(f, ", {}", parts.join(", "))?;
113        }
114        Ok(())
115    }
116}
117
118impl Metrics {
119    /// Updates the elapsed time of entering staging region.
120    pub fn update_enter_staging_region_elapsed(&mut self, elapsed: Duration) {
121        self.enter_staging_region_elapsed += elapsed;
122    }
123
124    pub fn update_flush_pending_deallocate_regions_elapsed(&mut self, elapsed: Duration) {
125        self.flush_pending_deallocate_regions_elapsed += elapsed;
126    }
127
128    /// Updates the elapsed time of applying staging manifest.
129    pub fn update_apply_staging_manifest_elapsed(&mut self, elapsed: Duration) {
130        self.apply_staging_manifest_elapsed += elapsed;
131    }
132
133    /// Updates the elapsed time of remapping manifest.
134    pub fn update_remap_manifest_elapsed(&mut self, elapsed: Duration) {
135        self.remap_manifest_elapsed += elapsed;
136    }
137
138    /// Updates the elapsed time of updating metadata.
139    pub fn update_update_metadata_elapsed(&mut self, elapsed: Duration) {
140        self.update_metadata_elapsed += elapsed;
141    }
142}
143
144pub type GroupId = Uuid;
145
146pub struct RepartitionGroupProcedure {
147    state: Box<dyn State>,
148    context: Context,
149}
150
151#[derive(Debug, Serialize)]
152struct RepartitionGroupData<'a> {
153    persistent_ctx: &'a PersistentContext,
154    state: &'a dyn State,
155}
156
157#[derive(Debug, Deserialize)]
158struct RepartitionGroupDataOwned {
159    persistent_ctx: PersistentContext,
160    state: Box<dyn State>,
161}
162
163impl RepartitionGroupProcedure {
164    pub(crate) const TYPE_NAME: &'static str = "metasrv-procedure::RepartitionGroup";
165
166    pub fn new(persistent_context: PersistentContext, context: &repartition::Context) -> Self {
167        let state = Box::new(RepartitionStart);
168
169        Self {
170            state,
171            context: Context {
172                persistent_ctx: persistent_context,
173                cache_invalidator: context.cache_invalidator.clone(),
174                table_metadata_manager: context.table_metadata_manager.clone(),
175                mailbox: context.mailbox.clone(),
176                server_addr: context.server_addr.clone(),
177                start_time: Instant::now(),
178                volatile_ctx: VolatileContext::default(),
179            },
180        }
181    }
182
183    pub fn from_json<F>(json: &str, ctx_factory: F) -> ProcedureResult<Self>
184    where
185        F: FnOnce(PersistentContext) -> Context,
186    {
187        let RepartitionGroupDataOwned {
188            state,
189            persistent_ctx,
190        } = serde_json::from_str(json).context(FromJsonSnafu)?;
191        let context = ctx_factory(persistent_ctx);
192
193        Ok(Self { state, context })
194    }
195}
196
197#[async_trait::async_trait]
198impl Procedure for RepartitionGroupProcedure {
199    fn type_name(&self) -> &str {
200        Self::TYPE_NAME
201    }
202
203    #[tracing::instrument(skip_all, fields(
204        state = %self.state.name(),
205        table_id = self.context.persistent_ctx.table_id,
206        group_id = %self.context.persistent_ctx.group_id,
207    ))]
208    async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
209        let state = &mut self.state;
210        let state_name = state.name();
211        // Log state transition
212        common_telemetry::info!(
213            "Repartition group procedure executing state: {}, group id: {}, table id: {}",
214            state_name,
215            self.context.persistent_ctx.group_id,
216            self.context.persistent_ctx.table_id
217        );
218
219        match state.next(&mut self.context, _ctx).await {
220            Ok((next, status)) => {
221                *state = next;
222                Ok(status)
223            }
224            Err(e) => {
225                if e.is_retryable() {
226                    Err(ProcedureError::retry_later(e))
227                } else {
228                    error!(
229                        e;
230                        "Repartition group procedure failed, group id: {}, table id: {}",
231                        self.context.persistent_ctx.group_id,
232                        self.context.persistent_ctx.table_id,
233                    );
234                    Err(ProcedureError::external(e))
235                }
236            }
237        }
238    }
239
240    fn rollback_supported(&self) -> bool {
241        false
242    }
243
244    fn dump(&self) -> ProcedureResult<String> {
245        let data = RepartitionGroupData {
246            persistent_ctx: &self.context.persistent_ctx,
247            state: self.state.as_ref(),
248        };
249        serde_json::to_string(&data).context(ToJsonSnafu)
250    }
251
252    fn lock_key(&self) -> LockKey {
253        LockKey::new(self.context.persistent_ctx.lock_key())
254    }
255
256    fn user_metadata(&self) -> Option<UserMetadata> {
257        // TODO(weny): support user metadata.
258        None
259    }
260}
261
262pub struct Context {
263    pub persistent_ctx: PersistentContext,
264
265    pub cache_invalidator: CacheInvalidatorRef,
266
267    pub table_metadata_manager: TableMetadataManagerRef,
268
269    pub mailbox: MailboxRef,
270
271    pub server_addr: String,
272
273    pub start_time: Instant,
274
275    pub volatile_ctx: VolatileContext,
276}
277
278#[derive(Debug, Clone, Default)]
279pub struct VolatileContext {
280    pub metrics: Metrics,
281}
282
283impl Context {
284    pub fn new(
285        ddl_ctx: &DdlContext,
286        mailbox: MailboxRef,
287        server_addr: String,
288        persistent_ctx: PersistentContext,
289    ) -> Self {
290        Self {
291            persistent_ctx,
292            cache_invalidator: ddl_ctx.cache_invalidator.clone(),
293            table_metadata_manager: ddl_ctx.table_metadata_manager.clone(),
294            mailbox,
295            server_addr,
296            start_time: Instant::now(),
297            volatile_ctx: VolatileContext::default(),
298        }
299    }
300}
301
302/// The result of the group preparation phase, containing validated region routes.
303#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
304pub struct GroupPrepareResult {
305    /// The validated source region routes.
306    pub source_routes: Vec<RegionRoute>,
307    /// The validated target region routes.
308    pub target_routes: Vec<RegionRoute>,
309    /// The primary source region id (first source region), used for retrieving region options.
310    pub central_region: RegionId,
311    /// The peer where the primary source region is located.
312    pub central_region_datanode: Peer,
313}
314
315#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
316pub struct PersistentContext {
317    pub group_id: GroupId,
318    /// The table id of the repartition group.
319    pub table_id: TableId,
320    /// The catalog name of the repartition group.
321    pub catalog_name: String,
322    /// The schema name of the repartition group.
323    pub schema_name: String,
324    /// The source regions of the repartition group.
325    pub sources: Vec<RegionDescriptor>,
326    /// The target regions of the repartition group.
327    pub targets: Vec<RegionDescriptor>,
328    /// For each `source region`, the corresponding
329    /// `target regions` that overlap with it.
330    pub region_mapping: HashMap<RegionId, Vec<RegionId>>,
331    /// The result of group prepare.
332    /// The value will be set in [RepartitionStart](crate::procedure::repartition::group::repartition_start::RepartitionStart) state.
333    pub group_prepare_result: Option<GroupPrepareResult>,
334    /// The staging manifest paths of the repartition group.
335    /// The value will be set in [RemapManifest](crate::procedure::repartition::group::remap_manifest::RemapManifest) state.
336    pub staging_manifest_paths: HashMap<RegionId, String>,
337    /// Whether sync region is needed for this group.
338    pub sync_region: bool,
339    /// The region ids of the newly allocated regions.
340    pub allocated_region_ids: Vec<RegionId>,
341    /// The region ids of the regions that are pending deallocation.
342    pub pending_deallocate_region_ids: Vec<RegionId>,
343    /// The timeout for repartition operations.
344    #[serde(with = "humantime_serde")]
345    pub timeout: Duration,
346}
347
348impl PersistentContext {
349    #[allow(clippy::too_many_arguments)]
350    pub fn new(
351        group_id: GroupId,
352        table_id: TableId,
353        catalog_name: String,
354        schema_name: String,
355        sources: Vec<RegionDescriptor>,
356        targets: Vec<RegionDescriptor>,
357        region_mapping: HashMap<RegionId, Vec<RegionId>>,
358        sync_region: bool,
359        allocated_region_ids: Vec<RegionId>,
360        pending_deallocate_region_ids: Vec<RegionId>,
361        timeout: Duration,
362    ) -> Self {
363        Self {
364            group_id,
365            table_id,
366            catalog_name,
367            schema_name,
368            sources,
369            targets,
370            region_mapping,
371            group_prepare_result: None,
372            staging_manifest_paths: HashMap::new(),
373            sync_region,
374            allocated_region_ids,
375            pending_deallocate_region_ids,
376            timeout,
377        }
378    }
379
380    pub fn lock_key(&self) -> Vec<StringKey> {
381        let mut lock_keys = Vec::with_capacity(2 + self.sources.len());
382        lock_keys.extend([
383            CatalogLock::Read(&self.catalog_name).into(),
384            SchemaLock::read(&self.catalog_name, &self.schema_name).into(),
385        ]);
386        for source in &self.sources {
387            lock_keys.push(RegionLock::Write(source.region_id).into());
388        }
389        lock_keys
390    }
391}
392
393impl Context {
394    /// Retrieves the table route value for the given table id.
395    ///
396    /// Retry:
397    /// - Failed to retrieve the metadata of table.
398    ///
399    /// Abort:
400    /// - Table route not found.
401    pub async fn get_table_route_value(
402        &self,
403    ) -> Result<DeserializedValueWithBytes<TableRouteValue>> {
404        let table_id = self.persistent_ctx.table_id;
405        let group_id = self.persistent_ctx.group_id;
406        let table_route_value = self
407            .table_metadata_manager
408            .table_route_manager()
409            .table_route_storage()
410            .get_with_raw_bytes(table_id)
411            .await
412            .map_err(BoxedError::new)
413            .with_context(|_| error::RetryLaterWithSourceSnafu {
414                reason: format!(
415                    "Failed to get table route for table: {}, repartition group: {}",
416                    table_id, group_id
417                ),
418            })?
419            .context(error::TableRouteNotFoundSnafu { table_id })?;
420
421        Ok(table_route_value)
422    }
423
424    /// Returns the `datanode_table_value`
425    ///
426    /// Retry:
427    /// - Failed to retrieve the metadata of datanode table.
428    pub async fn get_datanode_table_value(
429        &self,
430        table_id: TableId,
431        datanode_id: u64,
432    ) -> Result<DatanodeTableValue> {
433        get_datanode_table_value(&self.table_metadata_manager, table_id, datanode_id).await
434    }
435
436    /// Broadcasts the invalidate table cache message.
437    pub async fn invalidate_table_cache(&self) -> Result<()> {
438        let table_id = self.persistent_ctx.table_id;
439        let group_id = self.persistent_ctx.group_id;
440        let subject = format!(
441            "Invalidate table cache for repartition table, group: {}, table: {}",
442            group_id, table_id,
443        );
444        let ctx = common_meta::cache_invalidator::Context {
445            subject: Some(subject),
446        };
447        let _ = self
448            .cache_invalidator
449            .invalidate(&ctx, &[CacheIdent::TableId(table_id)])
450            .await;
451        Ok(())
452    }
453
454    /// Updates the table route.
455    ///
456    /// Retry:
457    /// - Failed to retrieve the metadata of datanode table.
458    ///
459    /// Abort:
460    /// - Table route not found.
461    /// - Failed to update the table route.
462    pub async fn update_table_route(
463        &self,
464        current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
465        new_region_routes: Vec<RegionRoute>,
466    ) -> Result<()> {
467        let table_id = self.persistent_ctx.table_id;
468        let group_id = self.persistent_ctx.group_id;
469        // Safety: prepare result is set in [RepartitionStart] state.
470        let prepare_result = self.persistent_ctx.group_prepare_result.as_ref().unwrap();
471        let central_region_datanode_table_value = self
472            .get_datanode_table_value(table_id, prepare_result.central_region_datanode.id)
473            .await?;
474        let RegionInfo {
475            region_options,
476            region_wal_options,
477            ..
478        } = &central_region_datanode_table_value.region_info;
479
480        info!(
481            "Updating table route for table: {}, group_id: {}, new region routes: {:?}",
482            table_id, group_id, new_region_routes
483        );
484        self.table_metadata_manager
485            .update_table_route(
486                table_id,
487                central_region_datanode_table_value.region_info.clone(),
488                current_table_route_value,
489                new_region_routes,
490                region_options,
491                region_wal_options,
492            )
493            .await
494            .context(error::TableMetadataManagerSnafu)
495    }
496
497    /// Updates the table repart mapping.
498    pub async fn update_table_repart_mapping(&self) -> Result<()> {
499        info!(
500            "Updating table repart mapping for table: {}, group_id: {}, region mapping: {:?}",
501            self.persistent_ctx.table_id,
502            self.persistent_ctx.group_id,
503            self.persistent_ctx.region_mapping
504        );
505
506        self.table_metadata_manager
507            .table_repart_manager()
508            .update_mappings(
509                self.persistent_ctx.table_id,
510                &self.persistent_ctx.region_mapping,
511            )
512            .await
513            .context(error::TableMetadataManagerSnafu)
514    }
515
516    /// Returns the next operation timeout.
517    ///
518    /// If the next operation timeout is not set, it will return `None`.
519    pub fn next_operation_timeout(&self) -> Option<Duration> {
520        self.persistent_ctx
521            .timeout
522            .checked_sub(self.start_time.elapsed())
523    }
524
525    /// Updates the elapsed time of entering staging region.
526    pub fn update_enter_staging_region_elapsed(&mut self, elapsed: Duration) {
527        self.volatile_ctx
528            .metrics
529            .update_enter_staging_region_elapsed(elapsed);
530    }
531
532    /// Updates the elapsed time of flushing pending deallocate regions.
533    pub fn update_flush_pending_deallocate_regions_elapsed(&mut self, elapsed: Duration) {
534        self.volatile_ctx
535            .metrics
536            .update_flush_pending_deallocate_regions_elapsed(elapsed);
537    }
538
539    /// Updates the elapsed time of applying staging manifest.
540    pub fn update_apply_staging_manifest_elapsed(&mut self, elapsed: Duration) {
541        self.volatile_ctx
542            .metrics
543            .update_apply_staging_manifest_elapsed(elapsed);
544    }
545
546    /// Updates the elapsed time of remapping manifest.
547    pub fn update_remap_manifest_elapsed(&mut self, elapsed: Duration) {
548        self.volatile_ctx
549            .metrics
550            .update_remap_manifest_elapsed(elapsed);
551    }
552
553    /// Updates the elapsed time of updating metadata.
554    pub fn update_update_metadata_elapsed(&mut self, elapsed: Duration) {
555        self.volatile_ctx
556            .metrics
557            .update_update_metadata_elapsed(elapsed);
558    }
559}
560
561/// Returns the region routes of the given table route value.
562///
563/// Abort:
564/// - Table route value is not physical.
565pub fn region_routes(
566    table_id: TableId,
567    table_route_value: &TableRouteValue,
568) -> Result<&Vec<RegionRoute>> {
569    table_route_value
570        .region_routes()
571        .with_context(|_| error::UnexpectedLogicalRouteTableSnafu {
572            err_msg: format!(
573                "TableRoute({:?}) is a non-physical TableRouteValue.",
574                table_id
575            ),
576        })
577}
578
579#[async_trait::async_trait]
580#[typetag::serde(tag = "repartition_group_state")]
581pub(crate) trait State: Sync + Send + Debug {
582    fn name(&self) -> &'static str {
583        let type_name = std::any::type_name::<Self>();
584        // short name
585        type_name.split("::").last().unwrap_or(type_name)
586    }
587
588    /// Yields the next [State] and [Status].
589    async fn next(
590        &mut self,
591        ctx: &mut Context,
592        procedure_ctx: &ProcedureContext,
593    ) -> Result<(Box<dyn State>, Status)>;
594
595    fn as_any(&self) -> &dyn Any;
596}
597
598#[cfg(test)]
599mod tests {
600    use std::assert_matches::assert_matches;
601    use std::sync::Arc;
602
603    use common_meta::key::TableMetadataManager;
604    use common_meta::kv_backend::test_util::MockKvBackendBuilder;
605
606    use crate::error::Error;
607    use crate::procedure::repartition::test_util::{TestingEnv, new_persistent_context};
608
609    #[tokio::test]
610    async fn test_get_table_route_value_not_found_error() {
611        let env = TestingEnv::new();
612        let persistent_context = new_persistent_context(1024, vec![], vec![]);
613        let ctx = env.create_context(persistent_context);
614        let err = ctx.get_table_route_value().await.unwrap_err();
615        assert_matches!(err, Error::TableRouteNotFound { .. });
616        assert!(!err.is_retryable());
617    }
618
619    #[tokio::test]
620    async fn test_get_table_route_value_retry_error() {
621        let kv = MockKvBackendBuilder::default()
622            .range_fn(Arc::new(|_| {
623                common_meta::error::UnexpectedSnafu {
624                    err_msg: "mock err",
625                }
626                .fail()
627            }))
628            .build()
629            .unwrap();
630        let mut env = TestingEnv::new();
631        env.table_metadata_manager = Arc::new(TableMetadataManager::new(Arc::new(kv)));
632        let persistent_context = new_persistent_context(1024, vec![], vec![]);
633        let ctx = env.create_context(persistent_context);
634        let err = ctx.get_table_route_value().await.unwrap_err();
635        assert!(err.is_retryable());
636    }
637
638    #[tokio::test]
639    async fn test_get_datanode_table_value_retry_error() {
640        let kv = MockKvBackendBuilder::default()
641            .range_fn(Arc::new(|_| {
642                common_meta::error::UnexpectedSnafu {
643                    err_msg: "mock err",
644                }
645                .fail()
646            }))
647            .build()
648            .unwrap();
649        let mut env = TestingEnv::new();
650        env.table_metadata_manager = Arc::new(TableMetadataManager::new(Arc::new(kv)));
651        let persistent_context = new_persistent_context(1024, vec![], vec![]);
652        let ctx = env.create_context(persistent_context);
653        let err = ctx.get_datanode_table_value(1024, 1).await.unwrap_err();
654        assert!(err.is_retryable());
655    }
656}