meta_srv/procedure/repartition/
allocate_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::collections::{HashMap, HashSet};
17
18use common_meta::ddl::create_table::executor::CreateTableExecutor;
19use common_meta::ddl::create_table::template::{
20    CreateRequestBuilder, build_template_from_raw_table_info_for_physical_table,
21};
22use common_meta::lock_key::TableLock;
23use common_meta::node_manager::NodeManagerRef;
24use common_meta::region_keeper::{MemoryRegionKeeperRef, OperatingRegionGuard};
25use common_meta::rpc::router::{RegionRoute, operating_leader_regions};
26use common_procedure::{Context as ProcedureContext, Status};
27use common_telemetry::info;
28use serde::{Deserialize, Serialize};
29use snafu::{OptionExt, ResultExt};
30use store_api::storage::{RegionNumber, TableId};
31use table::metadata::RawTableInfo;
32use table::table_reference::TableReference;
33use tokio::time::Instant;
34
35use crate::error::{self, Result};
36use crate::procedure::repartition::dispatch::Dispatch;
37use crate::procedure::repartition::plan::{
38    AllocationPlanEntry, RegionDescriptor, RepartitionPlanEntry,
39    convert_allocation_plan_to_repartition_plan,
40};
41use crate::procedure::repartition::{Context, State};
42
43#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct AllocateRegion {
45    plan_entries: Vec<AllocationPlanEntry>,
46}
47
48#[async_trait::async_trait]
49#[typetag::serde]
50impl State for AllocateRegion {
51    async fn next(
52        &mut self,
53        ctx: &mut Context,
54        procedure_ctx: &ProcedureContext,
55    ) -> Result<(Box<dyn State>, Status)> {
56        let timer = Instant::now();
57        let table_id = ctx.persistent_ctx.table_id;
58        let table_route_value = ctx.get_table_route_value().await?;
59        // Safety: it is physical table route value.
60        let region_routes = table_route_value.region_routes().unwrap();
61        let mut next_region_number =
62            Self::get_next_region_number(table_route_value.max_region_number().unwrap());
63
64        // Converts allocation plan to repartition plan.
65        let repartition_plan_entries = Self::convert_to_repartition_plans(
66            table_id,
67            &mut next_region_number,
68            &self.plan_entries,
69        );
70        let plan_count = repartition_plan_entries.len();
71        let to_allocate = Self::count_regions_to_allocate(&repartition_plan_entries);
72        info!(
73            "Repartition allocate regions start, table_id: {}, groups: {}, regions_to_allocate: {}",
74            table_id, plan_count, to_allocate
75        );
76
77        // If no region to allocate, directly dispatch the plan.
78        if Self::count_regions_to_allocate(&repartition_plan_entries) == 0 {
79            ctx.persistent_ctx.plans = repartition_plan_entries;
80            ctx.update_allocate_region_elapsed(timer.elapsed());
81            return Ok((Box::new(Dispatch), Status::executing(true)));
82        }
83
84        let allocate_regions = Self::collect_allocate_regions(&repartition_plan_entries);
85        let region_number_and_partition_exprs =
86            Self::prepare_region_allocation_data(&allocate_regions)?;
87        let table_info_value = ctx.get_table_info_value().await?;
88        let new_allocated_region_routes = ctx
89            .region_routes_allocator
90            .allocate(
91                table_id,
92                &region_number_and_partition_exprs
93                    .iter()
94                    .map(|(n, p)| (*n, p.as_str()))
95                    .collect::<Vec<_>>(),
96            )
97            .await
98            .context(error::AllocateRegionRoutesSnafu { table_id })?;
99        let wal_options = ctx
100            .wal_options_allocator
101            .allocate(
102                &allocate_regions
103                    .iter()
104                    .map(|r| r.region_id.region_number())
105                    .collect::<Vec<_>>(),
106                table_info_value.table_info.meta.options.skip_wal,
107            )
108            .await
109            .context(error::AllocateWalOptionsSnafu { table_id })?;
110
111        let new_region_count = new_allocated_region_routes.len();
112        let new_regions_brief: Vec<_> = new_allocated_region_routes
113            .iter()
114            .map(|route| {
115                let region_id = route.region.id;
116                let peer = route.leader_peer.as_ref().map(|p| p.id).unwrap_or_default();
117                format!("region_id: {}, peer: {}", region_id, peer)
118            })
119            .collect();
120        info!(
121            "Allocated regions for repartition, table_id: {}, new_region_count: {}, new_regions: {:?}",
122            table_id, new_region_count, new_regions_brief
123        );
124
125        let _operating_guards = Self::register_operating_regions(
126            &ctx.memory_region_keeper,
127            &new_allocated_region_routes,
128        )?;
129        // Allocates the regions on datanodes.
130        Self::allocate_regions(
131            &ctx.node_manager,
132            &table_info_value.table_info,
133            &new_allocated_region_routes,
134            &wal_options,
135        )
136        .await?;
137
138        // TODO(weny): for metric engine, sync logical regions from the the central region.
139
140        // Updates the table routes.
141        let table_lock = TableLock::Write(table_id).into();
142        let _guard = procedure_ctx.provider.acquire_lock(&table_lock).await;
143        let new_region_routes =
144            Self::generate_region_routes(region_routes, &new_allocated_region_routes);
145        ctx.update_table_route(&table_route_value, new_region_routes, wal_options)
146            .await?;
147        ctx.invalidate_table_cache().await?;
148
149        ctx.persistent_ctx.plans = repartition_plan_entries;
150        ctx.update_allocate_region_elapsed(timer.elapsed());
151        Ok((Box::new(Dispatch), Status::executing(true)))
152    }
153
154    fn as_any(&self) -> &dyn Any {
155        self
156    }
157}
158
159impl AllocateRegion {
160    pub fn new(plan_entries: Vec<AllocationPlanEntry>) -> Self {
161        Self { plan_entries }
162    }
163
164    fn register_operating_regions(
165        memory_region_keeper: &MemoryRegionKeeperRef,
166        region_routes: &[RegionRoute],
167    ) -> Result<Vec<OperatingRegionGuard>> {
168        let mut operating_guards = Vec::with_capacity(region_routes.len());
169        for (region_id, datanode_id) in operating_leader_regions(region_routes) {
170            let guard = memory_region_keeper
171                .register(datanode_id, region_id)
172                .context(error::RegionOperatingRaceSnafu {
173                    peer_id: datanode_id,
174                    region_id,
175                })?;
176            operating_guards.push(guard);
177        }
178        Ok(operating_guards)
179    }
180
181    fn generate_region_routes(
182        region_routes: &[RegionRoute],
183        new_allocated_region_ids: &[RegionRoute],
184    ) -> Vec<RegionRoute> {
185        let region_ids = region_routes
186            .iter()
187            .map(|r| r.region.id)
188            .collect::<HashSet<_>>();
189        let mut new_region_routes = region_routes.to_vec();
190        for new_allocated_region_id in new_allocated_region_ids {
191            if !region_ids.contains(&new_allocated_region_id.region.id) {
192                new_region_routes.push(new_allocated_region_id.clone());
193            }
194        }
195        new_region_routes
196    }
197
198    /// Converts allocation plan entries to repartition plan entries.
199    ///
200    /// This method takes the allocation plan entries and converts them to repartition plan entries,
201    /// updating `next_region_number` for each newly allocated region.
202    fn convert_to_repartition_plans(
203        table_id: TableId,
204        next_region_number: &mut RegionNumber,
205        plan_entries: &[AllocationPlanEntry],
206    ) -> Vec<RepartitionPlanEntry> {
207        plan_entries
208            .iter()
209            .map(|plan_entry| {
210                convert_allocation_plan_to_repartition_plan(
211                    table_id,
212                    next_region_number,
213                    plan_entry,
214                )
215            })
216            .collect()
217    }
218
219    /// Collects all regions that need to be allocated from the repartition plan entries.
220    fn collect_allocate_regions(
221        repartition_plan_entries: &[RepartitionPlanEntry],
222    ) -> Vec<&RegionDescriptor> {
223        repartition_plan_entries
224            .iter()
225            .flat_map(|p| p.allocate_regions())
226            .collect()
227    }
228
229    /// Prepares region allocation data: region numbers and their partition expressions.
230    fn prepare_region_allocation_data(
231        allocate_regions: &[&RegionDescriptor],
232    ) -> Result<Vec<(RegionNumber, String)>> {
233        allocate_regions
234            .iter()
235            .map(|r| {
236                Ok((
237                    r.region_id.region_number(),
238                    r.partition_expr
239                        .as_json_str()
240                        .context(error::SerializePartitionExprSnafu)?,
241                ))
242            })
243            .collect()
244    }
245
246    /// Calculates the total number of regions that need to be allocated.
247    fn count_regions_to_allocate(repartition_plan_entries: &[RepartitionPlanEntry]) -> usize {
248        repartition_plan_entries
249            .iter()
250            .map(|p| p.allocated_region_ids.len())
251            .sum()
252    }
253
254    /// Gets the next region number from the physical table route.
255    fn get_next_region_number(max_region_number: RegionNumber) -> RegionNumber {
256        max_region_number + 1
257    }
258
259    async fn allocate_regions(
260        node_manager: &NodeManagerRef,
261        raw_table_info: &RawTableInfo,
262        region_routes: &[RegionRoute],
263        wal_options: &HashMap<RegionNumber, String>,
264    ) -> Result<()> {
265        let table_ref = TableReference::full(
266            &raw_table_info.catalog_name,
267            &raw_table_info.schema_name,
268            &raw_table_info.name,
269        );
270        let table_id = raw_table_info.ident.table_id;
271        // Repartition allocation targets physical regions, so exclude metric internal columns
272        // and derive primary keys from tag semantics.
273        let request = build_template_from_raw_table_info_for_physical_table(raw_table_info)
274            .context(error::BuildCreateRequestSnafu { table_id })?;
275        common_telemetry::debug!(
276            "Allocating regions request, table_id: {}, request: {:?}",
277            table_id,
278            request
279        );
280        let builder = CreateRequestBuilder::new(request, None);
281        let region_count = region_routes.len();
282        let wal_region_count = wal_options.len();
283        info!(
284            "Allocating regions on datanodes, table_id: {}, region_count: {}, wal_regions: {}",
285            table_id, region_count, wal_region_count
286        );
287        let executor = CreateTableExecutor::new(table_ref.into(), false, builder);
288        executor
289            .on_create_regions(node_manager, table_id, region_routes, wal_options)
290            .await
291            .context(error::AllocateRegionsSnafu { table_id })?;
292
293        Ok(())
294    }
295}
296
297#[cfg(test)]
298mod tests {
299    use store_api::storage::RegionId;
300    use uuid::Uuid;
301
302    use super::*;
303    use crate::procedure::repartition::test_util::range_expr;
304
305    fn create_region_descriptor(
306        table_id: TableId,
307        region_number: u32,
308        col: &str,
309        start: i64,
310        end: i64,
311    ) -> RegionDescriptor {
312        RegionDescriptor {
313            region_id: RegionId::new(table_id, region_number),
314            partition_expr: range_expr(col, start, end),
315        }
316    }
317
318    fn create_allocation_plan_entry(
319        table_id: TableId,
320        source_region_numbers: &[u32],
321        target_ranges: &[(i64, i64)],
322    ) -> AllocationPlanEntry {
323        let source_regions = source_region_numbers
324            .iter()
325            .enumerate()
326            .map(|(i, &n)| {
327                let start = i as i64 * 100;
328                let end = (i + 1) as i64 * 100;
329                create_region_descriptor(table_id, n, "x", start, end)
330            })
331            .collect();
332
333        let target_partition_exprs = target_ranges
334            .iter()
335            .map(|&(start, end)| range_expr("x", start, end))
336            .collect();
337
338        AllocationPlanEntry {
339            group_id: Uuid::new_v4(),
340            source_regions,
341            target_partition_exprs,
342            transition_map: vec![],
343        }
344    }
345
346    #[test]
347    fn test_convert_to_repartition_plans_no_allocation() {
348        let table_id = 1024;
349        let mut next_region_number = 10;
350
351        // 2 source -> 2 target (no allocation needed)
352        let plan_entries = vec![create_allocation_plan_entry(
353            table_id,
354            &[1, 2],
355            &[(0, 50), (50, 200)],
356        )];
357
358        let result = AllocateRegion::convert_to_repartition_plans(
359            table_id,
360            &mut next_region_number,
361            &plan_entries,
362        );
363
364        assert_eq!(result.len(), 1);
365        assert_eq!(result[0].target_regions.len(), 2);
366        assert!(result[0].allocated_region_ids.is_empty());
367        // next_region_number should not change
368        assert_eq!(next_region_number, 10);
369    }
370
371    #[test]
372    fn test_convert_to_repartition_plans_with_allocation() {
373        let table_id = 1024;
374        let mut next_region_number = 10;
375
376        // 2 source -> 4 target (need to allocate 2 regions)
377        let plan_entries = vec![create_allocation_plan_entry(
378            table_id,
379            &[1, 2],
380            &[(0, 50), (50, 100), (100, 150), (150, 200)],
381        )];
382
383        let result = AllocateRegion::convert_to_repartition_plans(
384            table_id,
385            &mut next_region_number,
386            &plan_entries,
387        );
388
389        assert_eq!(result.len(), 1);
390        assert_eq!(result[0].target_regions.len(), 4);
391        assert_eq!(result[0].allocated_region_ids.len(), 2);
392        assert_eq!(
393            result[0].allocated_region_ids[0],
394            RegionId::new(table_id, 10)
395        );
396        assert_eq!(
397            result[0].allocated_region_ids[1],
398            RegionId::new(table_id, 11)
399        );
400        // next_region_number should be incremented by 2
401        assert_eq!(next_region_number, 12);
402    }
403
404    #[test]
405    fn test_convert_to_repartition_plans_multiple_entries() {
406        let table_id = 1024;
407        let mut next_region_number = 10;
408
409        // Multiple plan entries with different allocation needs
410        let plan_entries = vec![
411            create_allocation_plan_entry(table_id, &[1], &[(0, 50), (50, 100)]), // need 1 allocation
412            create_allocation_plan_entry(table_id, &[2, 3], &[(100, 150), (150, 200)]), // no allocation
413            create_allocation_plan_entry(table_id, &[4], &[(200, 250), (250, 300), (300, 400)]), // need 2 allocations
414        ];
415
416        let result = AllocateRegion::convert_to_repartition_plans(
417            table_id,
418            &mut next_region_number,
419            &plan_entries,
420        );
421
422        assert_eq!(result.len(), 3);
423        assert_eq!(result[0].allocated_region_ids.len(), 1);
424        assert_eq!(result[1].allocated_region_ids.len(), 0);
425        assert_eq!(result[2].allocated_region_ids.len(), 2);
426        // next_region_number should be incremented by 3 total
427        assert_eq!(next_region_number, 13);
428    }
429
430    #[test]
431    fn test_count_regions_to_allocate() {
432        let table_id = 1024;
433        let mut next_region_number = 10;
434
435        let plan_entries = vec![
436            create_allocation_plan_entry(table_id, &[1], &[(0, 50), (50, 100)]), // 1 allocation
437            create_allocation_plan_entry(table_id, &[2, 3], &[(100, 200)]), // 0 allocation (deallocate)
438            create_allocation_plan_entry(table_id, &[4], &[(200, 250), (250, 300)]), // 1 allocation
439        ];
440
441        let repartition_plans = AllocateRegion::convert_to_repartition_plans(
442            table_id,
443            &mut next_region_number,
444            &plan_entries,
445        );
446
447        let count = AllocateRegion::count_regions_to_allocate(&repartition_plans);
448        assert_eq!(count, 2);
449    }
450
451    #[test]
452    fn test_collect_allocate_regions() {
453        let table_id = 1024;
454        let mut next_region_number = 10;
455
456        let plan_entries = vec![
457            create_allocation_plan_entry(table_id, &[1], &[(0, 50), (50, 100)]), // 1 allocation
458            create_allocation_plan_entry(table_id, &[2], &[(100, 150), (150, 200)]), // 1 allocation
459        ];
460
461        let repartition_plans = AllocateRegion::convert_to_repartition_plans(
462            table_id,
463            &mut next_region_number,
464            &plan_entries,
465        );
466
467        let allocate_regions = AllocateRegion::collect_allocate_regions(&repartition_plans);
468        assert_eq!(allocate_regions.len(), 2);
469        assert_eq!(allocate_regions[0].region_id, RegionId::new(table_id, 10));
470        assert_eq!(allocate_regions[1].region_id, RegionId::new(table_id, 11));
471    }
472
473    #[test]
474    fn test_prepare_region_allocation_data() {
475        let table_id = 1024;
476        let regions = [
477            create_region_descriptor(table_id, 10, "x", 0, 50),
478            create_region_descriptor(table_id, 11, "x", 50, 100),
479        ];
480        let region_refs: Vec<&RegionDescriptor> = regions.iter().collect();
481
482        let result = AllocateRegion::prepare_region_allocation_data(&region_refs).unwrap();
483
484        assert_eq!(result.len(), 2);
485        assert_eq!(result[0].0, 10);
486        assert_eq!(result[1].0, 11);
487        // Verify partition expressions are serialized
488        assert!(!result[0].1.is_empty());
489        assert!(!result[1].1.is_empty());
490    }
491}