meta_srv/procedure/
repartition.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod allocate_region;
16pub mod collect;
17pub mod deallocate_region;
18pub mod dispatch;
19pub mod group;
20pub mod plan;
21pub mod repartition_end;
22pub mod repartition_start;
23pub mod utils;
24
25use std::any::Any;
26use std::collections::HashMap;
27use std::fmt::{Debug, Display};
28use std::time::{Duration, Instant};
29
30use common_error::ext::BoxedError;
31use common_meta::cache_invalidator::CacheInvalidatorRef;
32use common_meta::ddl::DdlContext;
33use common_meta::ddl::allocator::region_routes::RegionRoutesAllocatorRef;
34use common_meta::ddl::allocator::wal_options::WalOptionsAllocatorRef;
35use common_meta::ddl_manager::RepartitionProcedureFactory;
36use common_meta::instruction::CacheIdent;
37use common_meta::key::datanode_table::RegionInfo;
38use common_meta::key::table_info::TableInfoValue;
39use common_meta::key::table_route::TableRouteValue;
40use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
41use common_meta::lock_key::{CatalogLock, SchemaLock, TableLock, TableNameLock};
42use common_meta::node_manager::NodeManagerRef;
43use common_meta::region_keeper::MemoryRegionKeeperRef;
44use common_meta::region_registry::LeaderRegionRegistryRef;
45use common_meta::rpc::router::RegionRoute;
46use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
47use common_procedure::{
48    BoxedProcedure, Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
49    ProcedureManagerRef, Result as ProcedureResult, Status, StringKey, UserMetadata,
50};
51use common_telemetry::{error, info};
52use partition::expr::PartitionExpr;
53use serde::{Deserialize, Serialize};
54use snafu::{OptionExt, ResultExt};
55use store_api::storage::{RegionNumber, TableId};
56use table::table_name::TableName;
57
58use crate::error::{self, Result};
59use crate::procedure::repartition::group::{
60    Context as RepartitionGroupContext, RepartitionGroupProcedure,
61};
62use crate::procedure::repartition::plan::RepartitionPlanEntry;
63use crate::procedure::repartition::repartition_start::RepartitionStart;
64use crate::procedure::repartition::utils::get_datanode_table_value;
65use crate::service::mailbox::MailboxRef;
66
67#[cfg(test)]
68pub mod test_util;
69
70#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
71pub struct PersistentContext {
72    pub catalog_name: String,
73    pub schema_name: String,
74    pub table_name: String,
75    pub table_id: TableId,
76    pub plans: Vec<RepartitionPlanEntry>,
77    /// The timeout for repartition operations.
78    #[serde(with = "humantime_serde", default = "default_timeout")]
79    pub timeout: Duration,
80}
81
82fn default_timeout() -> Duration {
83    Duration::from_mins(2)
84}
85
86impl PersistentContext {
87    /// Creates a new [PersistentContext] with the given table name, table id and timeout.
88    ///
89    /// If the timeout is not provided, the default timeout will be used.
90    pub fn new(
91        TableName {
92            catalog_name,
93            schema_name,
94            table_name,
95        }: TableName,
96        table_id: TableId,
97        timeout: Option<Duration>,
98    ) -> Self {
99        Self {
100            catalog_name,
101            schema_name,
102            table_name,
103            table_id,
104            plans: vec![],
105            timeout: timeout.unwrap_or_else(default_timeout),
106        }
107    }
108
109    pub fn lock_key(&self) -> Vec<StringKey> {
110        vec![
111            CatalogLock::Read(&self.catalog_name).into(),
112            SchemaLock::read(&self.catalog_name, &self.schema_name).into(),
113            TableLock::Write(self.table_id).into(),
114            TableNameLock::new(&self.catalog_name, &self.schema_name, &self.table_name).into(),
115        ]
116    }
117}
118
119#[derive(Clone)]
120pub struct Context {
121    pub persistent_ctx: PersistentContext,
122    pub volatile_ctx: VolatileContext,
123    pub table_metadata_manager: TableMetadataManagerRef,
124    pub memory_region_keeper: MemoryRegionKeeperRef,
125    pub node_manager: NodeManagerRef,
126    pub leader_region_registry: LeaderRegionRegistryRef,
127    pub mailbox: MailboxRef,
128    pub server_addr: String,
129    pub cache_invalidator: CacheInvalidatorRef,
130    pub region_routes_allocator: RegionRoutesAllocatorRef,
131    pub wal_options_allocator: WalOptionsAllocatorRef,
132    pub start_time: Instant,
133}
134
135#[derive(Debug, Clone, Default)]
136pub struct VolatileContext {
137    pub metrics: Metrics,
138    pub dispatch_start_time: Option<Instant>,
139}
140
141/// Metrics of repartition.
142#[derive(Debug, Clone, Default)]
143pub struct Metrics {
144    /// Elapsed time of building plan.
145    build_plan_elapsed: Duration,
146    /// Elapsed time of allocating region.
147    allocate_region_elapsed: Duration,
148    /// Elapsed time of finishing groups.
149    finish_groups_elapsed: Duration,
150    /// Elapsed time of deallocating region.
151    deallocate_region_elapsed: Duration,
152}
153
154impl Display for Metrics {
155    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
156        let total = self.build_plan_elapsed
157            + self.allocate_region_elapsed
158            + self.finish_groups_elapsed
159            + self.deallocate_region_elapsed;
160        write!(f, "total: {:?}", total)?;
161        let mut parts = Vec::with_capacity(4);
162        if self.build_plan_elapsed > Duration::ZERO {
163            parts.push(format!("build_plan_elapsed: {:?}", self.build_plan_elapsed));
164        }
165        if self.allocate_region_elapsed > Duration::ZERO {
166            parts.push(format!(
167                "allocate_region_elapsed: {:?}",
168                self.allocate_region_elapsed
169            ));
170        }
171        if self.finish_groups_elapsed > Duration::ZERO {
172            parts.push(format!(
173                "finish_groups_elapsed: {:?}",
174                self.finish_groups_elapsed
175            ));
176        }
177        if self.deallocate_region_elapsed > Duration::ZERO {
178            parts.push(format!(
179                "deallocate_region_elapsed: {:?}",
180                self.deallocate_region_elapsed
181            ));
182        }
183
184        if !parts.is_empty() {
185            write!(f, ", {}", parts.join(", "))?;
186        }
187        Ok(())
188    }
189}
190
191impl Metrics {
192    /// Updates the elapsed time of building plan.
193    pub fn update_build_plan_elapsed(&mut self, elapsed: Duration) {
194        self.build_plan_elapsed += elapsed;
195    }
196
197    /// Updates the elapsed time of allocating region.
198    pub fn update_allocate_region_elapsed(&mut self, elapsed: Duration) {
199        self.allocate_region_elapsed += elapsed;
200    }
201
202    /// Updates the elapsed time of finishing groups.
203    pub fn update_finish_groups_elapsed(&mut self, elapsed: Duration) {
204        self.finish_groups_elapsed += elapsed;
205    }
206
207    /// Updates the elapsed time of deallocating region.
208    pub fn update_deallocate_region_elapsed(&mut self, elapsed: Duration) {
209        self.deallocate_region_elapsed += elapsed;
210    }
211}
212
213impl Context {
214    pub fn new(
215        ddl_ctx: &DdlContext,
216        mailbox: MailboxRef,
217        server_addr: String,
218        persistent_ctx: PersistentContext,
219    ) -> Self {
220        Self {
221            persistent_ctx,
222            table_metadata_manager: ddl_ctx.table_metadata_manager.clone(),
223            memory_region_keeper: ddl_ctx.memory_region_keeper.clone(),
224            node_manager: ddl_ctx.node_manager.clone(),
225            leader_region_registry: ddl_ctx.leader_region_registry.clone(),
226            mailbox,
227            server_addr,
228            cache_invalidator: ddl_ctx.cache_invalidator.clone(),
229            region_routes_allocator: ddl_ctx.table_metadata_allocator.region_routes_allocator(),
230            wal_options_allocator: ddl_ctx.table_metadata_allocator.wal_options_allocator(),
231            start_time: Instant::now(),
232            volatile_ctx: VolatileContext::default(),
233        }
234    }
235
236    /// Returns the next operation's timeout.
237    pub fn next_operation_timeout(&self) -> Option<Duration> {
238        self.persistent_ctx
239            .timeout
240            .checked_sub(self.start_time.elapsed())
241    }
242
243    /// Updates the elapsed time of building plan.
244    pub fn update_build_plan_elapsed(&mut self, elapsed: Duration) {
245        self.volatile_ctx.metrics.update_build_plan_elapsed(elapsed);
246    }
247
248    /// Updates the elapsed time of allocating region.
249    pub fn update_allocate_region_elapsed(&mut self, elapsed: Duration) {
250        self.volatile_ctx
251            .metrics
252            .update_allocate_region_elapsed(elapsed);
253    }
254
255    /// Updates the elapsed time of finishing groups.
256    pub fn update_finish_groups_elapsed(&mut self, elapsed: Duration) {
257        self.volatile_ctx
258            .metrics
259            .update_finish_groups_elapsed(elapsed);
260    }
261
262    /// Updates the elapsed time of deallocating region.
263    pub fn update_deallocate_region_elapsed(&mut self, elapsed: Duration) {
264        self.volatile_ctx
265            .metrics
266            .update_deallocate_region_elapsed(elapsed);
267    }
268
269    /// Retrieves the table route value for the given table id.
270    ///
271    /// Retry:
272    /// - Failed to retrieve the metadata of table.
273    ///
274    /// Abort:
275    /// - Table route not found.
276    pub async fn get_table_route_value(
277        &self,
278    ) -> Result<DeserializedValueWithBytes<TableRouteValue>> {
279        let table_id = self.persistent_ctx.table_id;
280        let table_route_value = self
281            .table_metadata_manager
282            .table_route_manager()
283            .table_route_storage()
284            .get_with_raw_bytes(table_id)
285            .await
286            .map_err(BoxedError::new)
287            .with_context(|_| error::RetryLaterWithSourceSnafu {
288                reason: format!("Failed to get table route for table: {}", table_id),
289            })?
290            .context(error::TableRouteNotFoundSnafu { table_id })?;
291
292        Ok(table_route_value)
293    }
294
295    /// Retrieves the table info value for the given table id.
296    ///
297    /// Retry:
298    /// - Failed to retrieve the metadata of table.
299    ///
300    /// Abort:
301    /// - Table info not found.
302    pub async fn get_table_info_value(&self) -> Result<TableInfoValue> {
303        let table_id = self.persistent_ctx.table_id;
304        let table_info_value = self
305            .table_metadata_manager
306            .table_info_manager()
307            .get(table_id)
308            .await
309            .map_err(BoxedError::new)
310            .with_context(|_| error::RetryLaterWithSourceSnafu {
311                reason: format!("Failed to get table info for table: {}", table_id),
312            })?
313            .context(error::TableInfoNotFoundSnafu { table_id })?
314            .into_inner();
315        Ok(table_info_value)
316    }
317
318    /// Updates the table route.
319    ///
320    /// Retry:
321    /// - Failed to retrieve the metadata of datanode table.
322    ///
323    /// Abort:
324    /// - Table route not found.
325    /// - Failed to update the table route.
326    pub async fn update_table_route(
327        &self,
328        current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
329        new_region_routes: Vec<RegionRoute>,
330        new_region_wal_options: HashMap<RegionNumber, String>,
331    ) -> Result<()> {
332        let table_id = self.persistent_ctx.table_id;
333        if new_region_routes.is_empty() {
334            return error::UnexpectedSnafu {
335                violated: format!("new_region_routes is empty for table: {}", table_id),
336            }
337            .fail();
338        }
339        let datanode_id = new_region_routes
340            .first()
341            .unwrap()
342            .leader_peer
343            .as_ref()
344            .context(error::NoLeaderSnafu)?
345            .id;
346        let datanode_table_value =
347            get_datanode_table_value(&self.table_metadata_manager, table_id, datanode_id).await?;
348
349        let RegionInfo {
350            region_options,
351            region_wal_options,
352            ..
353        } = &datanode_table_value.region_info;
354
355        // Merge and validate the new region wal options.
356        let validated_region_wal_options =
357            crate::procedure::repartition::utils::merge_and_validate_region_wal_options(
358                region_wal_options,
359                new_region_wal_options,
360                &new_region_routes,
361                table_id,
362            )?;
363        info!(
364            "Updating table route for table: {}, new region routes: {:?}",
365            table_id, new_region_routes
366        );
367        self.table_metadata_manager
368            .update_table_route(
369                table_id,
370                datanode_table_value.region_info.clone(),
371                current_table_route_value,
372                new_region_routes,
373                region_options,
374                &validated_region_wal_options,
375            )
376            .await
377            .context(error::TableMetadataManagerSnafu)
378    }
379
380    /// Broadcasts the invalidate table cache message.
381    pub async fn invalidate_table_cache(&self) -> Result<()> {
382        let table_id = self.persistent_ctx.table_id;
383        let subject = format!(
384            "Invalidate table cache for repartition table, table: {}",
385            table_id,
386        );
387        let ctx = common_meta::cache_invalidator::Context {
388            subject: Some(subject),
389        };
390        let _ = self
391            .cache_invalidator
392            .invalidate(&ctx, &[CacheIdent::TableId(table_id)])
393            .await;
394        Ok(())
395    }
396}
397
398#[async_trait::async_trait]
399#[typetag::serde(tag = "repartition_state")]
400pub(crate) trait State: Sync + Send + Debug {
401    fn name(&self) -> &'static str {
402        let type_name = std::any::type_name::<Self>();
403        // short name
404        type_name.split("::").last().unwrap_or(type_name)
405    }
406
407    /// Yields the next [State] and [Status].
408    async fn next(
409        &mut self,
410        ctx: &mut Context,
411        procedure_ctx: &ProcedureContext,
412    ) -> Result<(Box<dyn State>, Status)>;
413
414    fn as_any(&self) -> &dyn Any;
415}
416
417pub struct RepartitionProcedure {
418    state: Box<dyn State>,
419    context: Context,
420}
421
422#[derive(Debug, Serialize)]
423struct RepartitionData<'a> {
424    state: &'a dyn State,
425    persistent_ctx: &'a PersistentContext,
426}
427
428#[derive(Debug, Deserialize)]
429struct RepartitionDataOwned {
430    state: Box<dyn State>,
431    persistent_ctx: PersistentContext,
432}
433
434impl RepartitionProcedure {
435    const TYPE_NAME: &'static str = "metasrv-procedure::Repartition";
436
437    pub fn new(
438        from_exprs: Vec<PartitionExpr>,
439        to_exprs: Vec<PartitionExpr>,
440        context: Context,
441    ) -> Self {
442        let state = Box::new(RepartitionStart::new(from_exprs, to_exprs));
443
444        Self { state, context }
445    }
446
447    pub fn from_json<F>(json: &str, ctx_factory: F) -> ProcedureResult<Self>
448    where
449        F: FnOnce(PersistentContext) -> Context,
450    {
451        let RepartitionDataOwned {
452            state,
453            persistent_ctx,
454        } = serde_json::from_str(json).context(FromJsonSnafu)?;
455        let context = ctx_factory(persistent_ctx);
456
457        Ok(Self { state, context })
458    }
459}
460
461#[async_trait::async_trait]
462impl Procedure for RepartitionProcedure {
463    fn type_name(&self) -> &str {
464        Self::TYPE_NAME
465    }
466
467    async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
468        let state = &mut self.state;
469        let state_name = state.name();
470        // Log state transition
471        common_telemetry::info!(
472            "Repartition procedure executing state: {}, table_id: {}",
473            state_name,
474            self.context.persistent_ctx.table_id
475        );
476        match state.next(&mut self.context, _ctx).await {
477            Ok((next, status)) => {
478                *state = next;
479                Ok(status)
480            }
481            Err(e) => {
482                if e.is_retryable() {
483                    Err(ProcedureError::retry_later(e))
484                } else {
485                    error!(
486                        e;
487                        "Repartition procedure failed, table id: {}",
488                        self.context.persistent_ctx.table_id,
489                    );
490                    Err(ProcedureError::external(e))
491                }
492            }
493        }
494    }
495
496    fn rollback_supported(&self) -> bool {
497        // TODO(weny): support rollback.
498        false
499    }
500
501    fn dump(&self) -> ProcedureResult<String> {
502        let data = RepartitionData {
503            state: self.state.as_ref(),
504            persistent_ctx: &self.context.persistent_ctx,
505        };
506        serde_json::to_string(&data).context(ToJsonSnafu)
507    }
508
509    fn lock_key(&self) -> LockKey {
510        LockKey::new(self.context.persistent_ctx.lock_key())
511    }
512
513    fn user_metadata(&self) -> Option<UserMetadata> {
514        // TODO(weny): support user metadata.
515        None
516    }
517}
518
519pub struct DefaultRepartitionProcedureFactory {
520    mailbox: MailboxRef,
521    server_addr: String,
522}
523
524impl DefaultRepartitionProcedureFactory {
525    pub fn new(mailbox: MailboxRef, server_addr: String) -> Self {
526        Self {
527            mailbox,
528            server_addr,
529        }
530    }
531}
532
533impl RepartitionProcedureFactory for DefaultRepartitionProcedureFactory {
534    fn create(
535        &self,
536        ddl_ctx: &DdlContext,
537        table_name: TableName,
538        table_id: TableId,
539        from_exprs: Vec<String>,
540        to_exprs: Vec<String>,
541        timeout: Option<Duration>,
542    ) -> std::result::Result<BoxedProcedure, BoxedError> {
543        let persistent_ctx = PersistentContext::new(table_name, table_id, timeout);
544        let from_exprs = from_exprs
545            .iter()
546            .map(|e| {
547                PartitionExpr::from_json_str(e)
548                    .context(error::DeserializePartitionExprSnafu)?
549                    .context(error::EmptyPartitionExprSnafu)
550            })
551            .collect::<Result<Vec<_>>>()
552            .map_err(BoxedError::new)?;
553        let to_exprs = to_exprs
554            .iter()
555            .map(|e| {
556                PartitionExpr::from_json_str(e)
557                    .context(error::DeserializePartitionExprSnafu)?
558                    .context(error::EmptyPartitionExprSnafu)
559            })
560            .collect::<Result<Vec<_>>>()
561            .map_err(BoxedError::new)?;
562
563        let procedure = RepartitionProcedure::new(
564            from_exprs,
565            to_exprs,
566            Context::new(
567                ddl_ctx,
568                self.mailbox.clone(),
569                self.server_addr.clone(),
570                persistent_ctx,
571            ),
572        );
573
574        Ok(Box::new(procedure))
575    }
576
577    fn register_loaders(
578        &self,
579        ddl_ctx: &DdlContext,
580        procedure_manager: &ProcedureManagerRef,
581    ) -> std::result::Result<(), BoxedError> {
582        // Registers the repartition procedure loader.
583        let mailbox = self.mailbox.clone();
584        let server_addr = self.server_addr.clone();
585        let moved_ddl_ctx = ddl_ctx.clone();
586        procedure_manager
587            .register_loader(
588                RepartitionProcedure::TYPE_NAME,
589                Box::new(move |json| {
590                    let mailbox = mailbox.clone();
591                    let server_addr = server_addr.clone();
592                    let ddl_ctx = moved_ddl_ctx.clone();
593                    let factory = move |persistent_ctx| {
594                        Context::new(&ddl_ctx, mailbox, server_addr, persistent_ctx)
595                    };
596                    RepartitionProcedure::from_json(json, factory).map(|p| Box::new(p) as _)
597                }),
598            )
599            .map_err(BoxedError::new)?;
600
601        // Registers the repartition group procedure loader.
602        let mailbox = self.mailbox.clone();
603        let server_addr = self.server_addr.clone();
604        let moved_ddl_ctx = ddl_ctx.clone();
605        procedure_manager
606            .register_loader(
607                RepartitionGroupProcedure::TYPE_NAME,
608                Box::new(move |json| {
609                    let mailbox = mailbox.clone();
610                    let server_addr = server_addr.clone();
611                    let ddl_ctx = moved_ddl_ctx.clone();
612                    let factory = move |persistent_ctx| {
613                        RepartitionGroupContext::new(&ddl_ctx, mailbox, server_addr, persistent_ctx)
614                    };
615                    RepartitionGroupProcedure::from_json(json, factory).map(|p| Box::new(p) as _)
616                }),
617            )
618            .map_err(BoxedError::new)?;
619
620        Ok(())
621    }
622}