meta_srv/procedure/
wal_prune.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub(crate) mod manager;
16#[cfg(test)]
17mod test_util;
18
19use std::collections::{HashMap, HashSet};
20use std::sync::Arc;
21use std::time::Duration;
22
23use api::v1::meta::MailboxMessage;
24use common_error::ext::BoxedError;
25use common_meta::instruction::{FlushRegions, Instruction};
26use common_meta::key::TableMetadataManagerRef;
27use common_meta::lock_key::RemoteWalLock;
28use common_meta::peer::Peer;
29use common_meta::region_registry::LeaderRegionRegistryRef;
30use common_procedure::error::ToJsonSnafu;
31use common_procedure::{
32    Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
33    Result as ProcedureResult, Status, StringKey,
34};
35use common_telemetry::{info, warn};
36use itertools::{Itertools, MinMaxResult};
37use log_store::kafka::DEFAULT_PARTITION;
38use manager::{WalPruneProcedureGuard, WalPruneProcedureTracker};
39use rskafka::client::partition::UnknownTopicHandling;
40use rskafka::client::Client;
41use serde::{Deserialize, Serialize};
42use snafu::ResultExt;
43use store_api::logstore::EntryId;
44use store_api::storage::RegionId;
45
46use crate::error::{
47    self, BuildPartitionClientSnafu, DeleteRecordsSnafu, TableMetadataManagerSnafu,
48    UpdateTopicNameValueSnafu,
49};
50use crate::service::mailbox::{Channel, MailboxRef};
51use crate::Result;
52
53pub type KafkaClientRef = Arc<Client>;
54
55const DELETE_RECORDS_TIMEOUT: Duration = Duration::from_secs(5);
56
57/// The state of WAL pruning.
58#[derive(Debug, Serialize, Deserialize)]
59pub enum WalPruneState {
60    Prepare,
61    FlushRegion,
62    Prune,
63}
64
65#[derive(Clone)]
66pub struct Context {
67    /// The Kafka client.
68    pub client: KafkaClientRef,
69    /// The table metadata manager.
70    pub table_metadata_manager: TableMetadataManagerRef,
71    /// The leader region registry.
72    pub leader_region_registry: LeaderRegionRegistryRef,
73    /// Server address of metasrv.
74    pub server_addr: String,
75    /// The mailbox to send messages.
76    pub mailbox: MailboxRef,
77}
78
79/// The data of WAL pruning.
80#[derive(Serialize, Deserialize)]
81pub struct WalPruneData {
82    /// The topic name to prune.
83    pub topic: String,
84    /// The minimum flush entry id for topic, which is used to prune the WAL.
85    pub prunable_entry_id: EntryId,
86    pub regions_to_flush: Vec<RegionId>,
87    /// If `prunable_entry_id` + `trigger_flush_threshold` < `max_prunable_entry_id`, send a flush request to the region.
88    /// If `None`, never send flush requests.
89    pub trigger_flush_threshold: u64,
90    /// The state.
91    pub state: WalPruneState,
92}
93
94/// The procedure to prune WAL.
95pub struct WalPruneProcedure {
96    pub data: WalPruneData,
97    pub context: Context,
98    pub _guard: Option<WalPruneProcedureGuard>,
99}
100
101impl WalPruneProcedure {
102    const TYPE_NAME: &'static str = "metasrv-procedure::WalPrune";
103
104    pub fn new(
105        topic: String,
106        context: Context,
107        trigger_flush_threshold: u64,
108        guard: Option<WalPruneProcedureGuard>,
109    ) -> Self {
110        Self {
111            data: WalPruneData {
112                topic,
113                prunable_entry_id: 0,
114                trigger_flush_threshold,
115                regions_to_flush: vec![],
116                state: WalPruneState::Prepare,
117            },
118            context,
119            _guard: guard,
120        }
121    }
122
123    pub fn from_json(
124        json: &str,
125        context: &Context,
126        tracker: WalPruneProcedureTracker,
127    ) -> ProcedureResult<Self> {
128        let data: WalPruneData = serde_json::from_str(json).context(ToJsonSnafu)?;
129        let guard = tracker.insert_running_procedure(data.topic.clone());
130        Ok(Self {
131            data,
132            context: context.clone(),
133            _guard: guard,
134        })
135    }
136
137    async fn build_peer_to_region_ids_map(
138        &self,
139        ctx: &Context,
140        region_ids: &[RegionId],
141    ) -> Result<HashMap<Peer, Vec<RegionId>>> {
142        let table_ids = region_ids
143            .iter()
144            .map(|region_id| region_id.table_id())
145            .collect::<HashSet<_>>()
146            .into_iter()
147            .collect::<Vec<_>>();
148        let table_ids_table_routes_map = ctx
149            .table_metadata_manager
150            .table_route_manager()
151            .batch_get_physical_table_routes(&table_ids)
152            .await
153            .context(TableMetadataManagerSnafu)?;
154
155        let mut peer_region_ids_map = HashMap::new();
156        for region_id in region_ids {
157            let table_id = region_id.table_id();
158            let table_route = match table_ids_table_routes_map.get(&table_id) {
159                Some(route) => route,
160                None => return error::TableRouteNotFoundSnafu { table_id }.fail(),
161            };
162            for region_route in &table_route.region_routes {
163                if region_route.region.id != *region_id {
164                    continue;
165                }
166                if let Some(peer) = &region_route.leader_peer {
167                    peer_region_ids_map
168                        .entry(peer.clone())
169                        .or_insert_with(Vec::new)
170                        .push(*region_id);
171                }
172            }
173        }
174        Ok(peer_region_ids_map)
175    }
176
177    fn build_flush_region_instruction(
178        &self,
179        peer_region_ids_map: HashMap<Peer, Vec<RegionId>>,
180    ) -> Result<Vec<(Peer, Instruction)>> {
181        let peer_and_instructions = peer_region_ids_map
182            .into_iter()
183            .map(|(peer, region_ids)| {
184                let flush_instruction = Instruction::FlushRegions(FlushRegions { region_ids });
185                (peer.clone(), flush_instruction)
186            })
187            .collect();
188        Ok(peer_and_instructions)
189    }
190
191    /// Prepare the entry id to prune and regions to flush.
192    ///
193    /// Retry:
194    /// - Failed to retrieve any metadata.
195    pub async fn on_prepare(&mut self) -> Result<Status> {
196        let region_ids = self
197            .context
198            .table_metadata_manager
199            .topic_region_manager()
200            .regions(&self.data.topic)
201            .await
202            .context(TableMetadataManagerSnafu)
203            .map_err(BoxedError::new)
204            .with_context(|_| error::RetryLaterWithSourceSnafu {
205                reason: "Failed to get topic-region map",
206            })?;
207        let prunable_entry_ids_map: HashMap<_, _> = self
208            .context
209            .leader_region_registry
210            .batch_get(region_ids.iter().cloned())
211            .into_iter()
212            .map(|(region_id, region)| {
213                let prunable_entry_id = region.manifest.prunable_entry_id();
214                (region_id, prunable_entry_id)
215            })
216            .collect();
217
218        // Check if the `prunable_entry_ids_map` contains all region ids.
219        let non_collected_region_ids =
220            check_heartbeat_collected_region_ids(&region_ids, &prunable_entry_ids_map);
221        if !non_collected_region_ids.is_empty() {
222            // The heartbeat collected region ids do not contain all region ids in the topic-region map.
223            // In this case, we should not prune the WAL.
224            warn!("The heartbeat collected region ids do not contain all region ids in the topic-region map. Aborting the WAL prune procedure.
225            topic: {}, non-collected region ids: {:?}", self.data.topic, non_collected_region_ids);
226            return Ok(Status::done());
227        }
228
229        let min_max_result = prunable_entry_ids_map.values().minmax();
230        let max_prunable_entry_id = match min_max_result {
231            MinMaxResult::NoElements => {
232                return Ok(Status::done());
233            }
234            MinMaxResult::OneElement(prunable_entry_id) => {
235                self.data.prunable_entry_id = *prunable_entry_id;
236                *prunable_entry_id
237            }
238            MinMaxResult::MinMax(min_prunable_entry_id, max_prunable_entry_id) => {
239                self.data.prunable_entry_id = *min_prunable_entry_id;
240                *max_prunable_entry_id
241            }
242        };
243        if self.data.trigger_flush_threshold != 0 {
244            for (region_id, prunable_entry_id) in prunable_entry_ids_map {
245                if prunable_entry_id + self.data.trigger_flush_threshold < max_prunable_entry_id {
246                    self.data.regions_to_flush.push(region_id);
247                }
248            }
249            self.data.state = WalPruneState::FlushRegion;
250        } else {
251            self.data.state = WalPruneState::Prune;
252        }
253        Ok(Status::executing(true))
254    }
255
256    /// Send the flush request to regions with low flush entry id.
257    ///
258    /// Retry:
259    /// - Failed to build peer to region ids map. It means failure in retrieving metadata.
260    pub async fn on_sending_flush_request(&mut self) -> Result<Status> {
261        let peer_to_region_ids_map = self
262            .build_peer_to_region_ids_map(&self.context, &self.data.regions_to_flush)
263            .await
264            .map_err(BoxedError::new)
265            .with_context(|_| error::RetryLaterWithSourceSnafu {
266                reason: "Failed to build peer to region ids map",
267            })?;
268        let flush_instructions = self.build_flush_region_instruction(peer_to_region_ids_map)?;
269        for (peer, flush_instruction) in flush_instructions.into_iter() {
270            let msg = MailboxMessage::json_message(
271                &format!("Flush regions: {}", flush_instruction),
272                &format!("Metasrv@{}", self.context.server_addr),
273                &format!("Datanode-{}@{}", peer.id, peer.addr),
274                common_time::util::current_time_millis(),
275                &flush_instruction,
276            )
277            .with_context(|_| error::SerializeToJsonSnafu {
278                input: flush_instruction.to_string(),
279            })?;
280            let ch = Channel::Datanode(peer.id);
281            self.context.mailbox.send_oneway(&ch, msg).await?;
282        }
283        self.data.state = WalPruneState::Prune;
284        Ok(Status::executing(true))
285    }
286
287    /// Prune the WAL and persist the minimum prunable entry id.
288    ///
289    /// Retry:
290    /// - Failed to update the minimum prunable entry id in kvbackend.
291    /// - Failed to delete records.
292    pub async fn on_prune(&mut self) -> Result<Status> {
293        // Safety: `prunable_entry_id`` are loaded in on_prepare.
294        let partition_client = self
295            .context
296            .client
297            .partition_client(
298                self.data.topic.clone(),
299                DEFAULT_PARTITION,
300                UnknownTopicHandling::Retry,
301            )
302            .await
303            .context(BuildPartitionClientSnafu {
304                topic: self.data.topic.clone(),
305                partition: DEFAULT_PARTITION,
306            })?;
307
308        // Should update the min prunable entry id in the kv backend before deleting records.
309        // Otherwise, when a datanode restarts, it will not be able to find the wal entries.
310        let prev = self
311            .context
312            .table_metadata_manager
313            .topic_name_manager()
314            .get(&self.data.topic)
315            .await
316            .context(TableMetadataManagerSnafu)
317            .map_err(BoxedError::new)
318            .with_context(|_| error::RetryLaterWithSourceSnafu {
319                reason: format!("Failed to get TopicNameValue, topic: {}", self.data.topic),
320            })?;
321        self.context
322            .table_metadata_manager
323            .topic_name_manager()
324            .update(&self.data.topic, self.data.prunable_entry_id, prev)
325            .await
326            .context(UpdateTopicNameValueSnafu {
327                topic: &self.data.topic,
328            })
329            .map_err(BoxedError::new)
330            .with_context(|_| error::RetryLaterWithSourceSnafu {
331                reason: format!(
332                    "Failed to update pruned entry id for topic: {}",
333                    self.data.topic
334                ),
335            })?;
336        partition_client
337            .delete_records(
338                // notice here no "+1" is needed because the offset arg is exclusive, and it's defensive programming just in case somewhere else have a off by one error, see https://kafka.apache.org/36/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#endOffsets(java.util.Collection) which we use to get the end offset from high watermark
339                self.data.prunable_entry_id as i64,
340                DELETE_RECORDS_TIMEOUT.as_millis() as i32,
341            )
342            .await
343            .context(DeleteRecordsSnafu {
344                topic: &self.data.topic,
345                partition: DEFAULT_PARTITION,
346                offset: self.data.prunable_entry_id,
347            })
348            .map_err(BoxedError::new)
349            .with_context(|_| error::RetryLaterWithSourceSnafu {
350                reason: format!(
351                    "Failed to delete records for topic: {}, partition: {}, offset: {}",
352                    self.data.topic, DEFAULT_PARTITION, self.data.prunable_entry_id
353                ),
354            })?;
355        info!(
356            "Successfully pruned WAL for topic: {}, entry id: {}",
357            self.data.topic, self.data.prunable_entry_id
358        );
359        Ok(Status::done())
360    }
361}
362
363#[async_trait::async_trait]
364impl Procedure for WalPruneProcedure {
365    fn type_name(&self) -> &str {
366        Self::TYPE_NAME
367    }
368
369    fn rollback_supported(&self) -> bool {
370        false
371    }
372
373    async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
374        let state = &self.data.state;
375
376        match state {
377            WalPruneState::Prepare => self.on_prepare().await,
378            WalPruneState::FlushRegion => self.on_sending_flush_request().await,
379            WalPruneState::Prune => self.on_prune().await,
380        }
381        .map_err(|e| {
382            if e.is_retryable() {
383                ProcedureError::retry_later(e)
384            } else {
385                ProcedureError::external(e)
386            }
387        })
388    }
389
390    fn dump(&self) -> ProcedureResult<String> {
391        serde_json::to_string(&self.data).context(ToJsonSnafu)
392    }
393
394    /// WAL prune procedure will read the topic-region map from the table metadata manager,
395    /// which are modified by `DROP [TABLE|DATABASE]` and `CREATE [TABLE]` operations.
396    /// But the modifications are atomic, so it does not conflict with the procedure.
397    /// It only abort the procedure sometimes since the `check_heartbeat_collected_region_ids` fails.
398    fn lock_key(&self) -> LockKey {
399        let lock_key: StringKey = RemoteWalLock::Write(self.data.topic.clone()).into();
400        LockKey::new(vec![lock_key])
401    }
402}
403
404/// Check if the heartbeat collected region ids contains all region ids in the topic-region map.
405fn check_heartbeat_collected_region_ids(
406    region_ids: &[RegionId],
407    heartbeat_collected_region_ids: &HashMap<RegionId, u64>,
408) -> Vec<RegionId> {
409    let mut non_collected_region_ids = Vec::new();
410    for region_id in region_ids {
411        if !heartbeat_collected_region_ids.contains_key(region_id) {
412            non_collected_region_ids.push(*region_id);
413        }
414    }
415    non_collected_region_ids
416}
417
418#[cfg(test)]
419mod tests {
420    use std::assert_matches::assert_matches;
421
422    use api::v1::meta::HeartbeatResponse;
423    use common_wal::test_util::run_test_with_kafka_wal;
424    use rskafka::record::Record;
425    use tokio::sync::mpsc::Receiver;
426
427    use super::*;
428    use crate::handler::HeartbeatMailbox;
429    use crate::procedure::test_util::new_wal_prune_metadata;
430    // Fix this import to correctly point to the test_util module
431    use crate::procedure::wal_prune::test_util::TestEnv;
432
433    /// Mock a test env for testing.
434    /// Including:
435    /// 1. Prepare some data in the table metadata manager and in-memory kv backend.
436    /// 2. Return the procedure, the minimum last entry id to prune and the regions to flush.
437    async fn mock_test_data(procedure: &WalPruneProcedure) -> (u64, Vec<RegionId>) {
438        let n_region = 10;
439        let n_table = 5;
440        // 5 entries per region.
441        let offsets = mock_wal_entries(
442            procedure.context.client.clone(),
443            &procedure.data.topic,
444            (n_region * n_table * 5) as usize,
445        )
446        .await;
447        let (prunable_entry_id, regions_to_flush) = new_wal_prune_metadata(
448            procedure.context.table_metadata_manager.clone(),
449            procedure.context.leader_region_registry.clone(),
450            n_region,
451            n_table,
452            &offsets,
453            procedure.data.trigger_flush_threshold,
454            procedure.data.topic.clone(),
455        )
456        .await;
457        (prunable_entry_id, regions_to_flush)
458    }
459
460    fn record(i: usize) -> Record {
461        let key = format!("key_{i}");
462        let value = format!("value_{i}");
463        Record {
464            key: Some(key.into()),
465            value: Some(value.into()),
466            timestamp: chrono::Utc::now(),
467            headers: Default::default(),
468        }
469    }
470
471    async fn mock_wal_entries(
472        client: KafkaClientRef,
473        topic_name: &str,
474        n_entries: usize,
475    ) -> Vec<i64> {
476        let controller_client = client.controller_client().unwrap();
477        let _ = controller_client
478            .create_topic(topic_name, 1, 1, 5_000)
479            .await;
480        let partition_client = client
481            .partition_client(topic_name, 0, UnknownTopicHandling::Retry)
482            .await
483            .unwrap();
484        let mut offsets = Vec::with_capacity(n_entries);
485        for i in 0..n_entries {
486            let record = vec![record(i)];
487            let offset = partition_client
488                .produce(
489                    record,
490                    rskafka::client::partition::Compression::NoCompression,
491                )
492                .await
493                .unwrap();
494            offsets.extend(offset);
495        }
496        offsets
497    }
498
499    async fn check_entry_id_existence(
500        client: KafkaClientRef,
501        topic_name: &str,
502        entry_id: i64,
503        expect_success: bool,
504    ) {
505        let partition_client = client
506            .partition_client(topic_name, 0, UnknownTopicHandling::Retry)
507            .await
508            .unwrap();
509        let res = partition_client
510            .fetch_records(entry_id, 0..10001, 5_000)
511            .await;
512        if expect_success {
513            assert!(res.is_ok());
514            let (record, _high_watermark) = res.unwrap();
515            assert!(!record.is_empty());
516        } else {
517            let err = res.unwrap_err();
518            // The error is in a private module so we check it through `to_string()`.
519            assert!(err.to_string().contains("OffsetOutOfRange"));
520        }
521    }
522
523    async fn delete_topic(client: KafkaClientRef, topic_name: &str) {
524        let controller_client = client.controller_client().unwrap();
525        controller_client
526            .delete_topic(topic_name, 5_000)
527            .await
528            .unwrap();
529    }
530
531    async fn check_flush_request(
532        rx: &mut Receiver<std::result::Result<HeartbeatResponse, tonic::Status>>,
533        region_ids: &[RegionId],
534    ) {
535        let resp = rx.recv().await.unwrap().unwrap();
536        let msg = resp.mailbox_message.unwrap();
537        let flush_instruction = HeartbeatMailbox::json_instruction(&msg).unwrap();
538        let mut flush_requested_region_ids = match flush_instruction {
539            Instruction::FlushRegions(FlushRegions { region_ids, .. }) => region_ids,
540            _ => unreachable!(),
541        };
542        let sorted_region_ids = region_ids
543            .iter()
544            .cloned()
545            .sorted_by_key(|a| a.as_u64())
546            .collect::<Vec<_>>();
547        flush_requested_region_ids.sort_by_key(|a| a.as_u64());
548        assert_eq!(flush_requested_region_ids, sorted_region_ids);
549    }
550
551    #[tokio::test]
552    async fn test_procedure_execution() {
553        run_test_with_kafka_wal(|broker_endpoints| {
554            Box::pin(async {
555                common_telemetry::init_default_ut_logging();
556                let mut topic_name = uuid::Uuid::new_v4().to_string();
557                // Topic should start with a letter.
558                topic_name = format!("test_procedure_execution-{}", topic_name);
559                let mut env = TestEnv::new();
560                let context = env.build_wal_prune_context(broker_endpoints).await;
561                TestEnv::prepare_topic(&context.client, &topic_name).await;
562                let mut procedure = WalPruneProcedure::new(topic_name.clone(), context, 10, None);
563
564                // Before any data in kvbackend is mocked, should return a retryable error.
565                let result = procedure.on_prune().await;
566                assert_matches!(result, Err(e) if e.is_retryable());
567
568                let (prunable_entry_id, regions_to_flush) = mock_test_data(&procedure).await;
569
570                // Step 1: Test `on_prepare`.
571                let status = procedure.on_prepare().await.unwrap();
572                assert_matches!(
573                    status,
574                    Status::Executing {
575                        persist: true,
576                        clean_poisons: false
577                    }
578                );
579                assert_matches!(procedure.data.state, WalPruneState::FlushRegion);
580                assert_eq!(procedure.data.prunable_entry_id, prunable_entry_id);
581                assert_eq!(
582                    procedure.data.regions_to_flush.len(),
583                    regions_to_flush.len()
584                );
585                for region_id in &regions_to_flush {
586                    assert!(procedure.data.regions_to_flush.contains(region_id));
587                }
588
589                // Step 2: Test `on_sending_flush_request`.
590                let (tx, mut rx) = tokio::sync::mpsc::channel(1);
591                env.mailbox
592                    .insert_heartbeat_response_receiver(Channel::Datanode(1), tx)
593                    .await;
594                let status = procedure.on_sending_flush_request().await.unwrap();
595                check_flush_request(&mut rx, &regions_to_flush).await;
596                assert_matches!(
597                    status,
598                    Status::Executing {
599                        persist: true,
600                        clean_poisons: false
601                    }
602                );
603                assert_matches!(procedure.data.state, WalPruneState::Prune);
604
605                // Step 3: Test `on_prune`.
606                let status = procedure.on_prune().await.unwrap();
607                assert_matches!(status, Status::Done { output: None });
608                // Check if the entry ids after(include) `prunable_entry_id` still exist.
609                check_entry_id_existence(
610                    procedure.context.client.clone(),
611                    &topic_name,
612                    procedure.data.prunable_entry_id as i64,
613                    true,
614                )
615                .await;
616                // Check if the entry ids before `prunable_entry_id` are deleted.
617                check_entry_id_existence(
618                    procedure.context.client.clone(),
619                    &topic_name,
620                    procedure.data.prunable_entry_id as i64 - 1,
621                    false,
622                )
623                .await;
624
625                let value = env
626                    .table_metadata_manager
627                    .topic_name_manager()
628                    .get(&topic_name)
629                    .await
630                    .unwrap()
631                    .unwrap();
632                assert_eq!(value.pruned_entry_id, procedure.data.prunable_entry_id);
633
634                // Step 4: Test `on_prepare`, `check_heartbeat_collected_region_ids` fails.
635                // Should log a warning and return `Status::Done`.
636                procedure.context.leader_region_registry.reset();
637                let status = procedure.on_prepare().await.unwrap();
638                assert_matches!(status, Status::Done { output: None });
639
640                // Step 5: Test `on_prepare`, don't flush regions.
641                procedure.data.trigger_flush_threshold = 0;
642                procedure.on_prepare().await.unwrap();
643                assert_matches!(procedure.data.state, WalPruneState::Prune);
644                assert_eq!(value.pruned_entry_id, procedure.data.prunable_entry_id);
645
646                // Clean up the topic.
647                delete_topic(procedure.context.client, &topic_name).await;
648            })
649        })
650        .await;
651    }
652}