1pub mod builder;
16
17use std::fmt::{self, Display};
18use std::sync::atomic::{AtomicBool, Ordering};
19use std::sync::{Arc, Mutex, RwLock};
20use std::time::Duration;
21
22use clap::ValueEnum;
23use common_base::readable_size::ReadableSize;
24use common_base::Plugins;
25use common_config::{Configurable, DEFAULT_DATA_HOME};
26use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
27use common_meta::cache_invalidator::CacheInvalidatorRef;
28use common_meta::ddl::ProcedureExecutorRef;
29use common_meta::distributed_time_constants;
30use common_meta::key::runtime_switch::RuntimeSwitchManagerRef;
31use common_meta::key::TableMetadataManagerRef;
32use common_meta::kv_backend::{KvBackendRef, ResettableKvBackend, ResettableKvBackendRef};
33use common_meta::leadership_notifier::{
34 LeadershipChangeNotifier, LeadershipChangeNotifierCustomizerRef,
35};
36use common_meta::node_expiry_listener::NodeExpiryListener;
37use common_meta::peer::Peer;
38use common_meta::region_keeper::MemoryRegionKeeperRef;
39use common_meta::region_registry::LeaderRegionRegistryRef;
40use common_meta::wal_options_allocator::WalOptionsAllocatorRef;
41use common_options::datanode::DatanodeClientOptions;
42use common_procedure::options::ProcedureConfig;
43use common_procedure::ProcedureManagerRef;
44use common_telemetry::logging::{LoggingOptions, TracingOptions};
45use common_telemetry::{error, info, warn};
46use common_wal::config::MetasrvWalConfig;
47use serde::{Deserialize, Serialize};
48use servers::export_metrics::ExportMetricsOption;
49use servers::grpc::GrpcOptions;
50use servers::http::HttpOptions;
51use snafu::{OptionExt, ResultExt};
52use store_api::storage::RegionId;
53use table::metadata::TableId;
54use tokio::sync::broadcast::error::RecvError;
55
56use crate::cluster::MetaPeerClientRef;
57use crate::election::{Election, LeaderChangeMessage};
58use crate::error::{
59 self, InitMetadataSnafu, KvBackendSnafu, Result, StartProcedureManagerSnafu,
60 StartTelemetryTaskSnafu, StopProcedureManagerSnafu,
61};
62use crate::failure_detector::PhiAccrualFailureDetectorOptions;
63use crate::handler::{HeartbeatHandlerGroupBuilder, HeartbeatHandlerGroupRef};
64use crate::lease::lookup_datanode_peer;
65use crate::procedure::region_migration::manager::RegionMigrationManagerRef;
66use crate::procedure::wal_prune::manager::WalPruneTickerRef;
67use crate::procedure::ProcedureManagerListenerAdapter;
68use crate::pubsub::{PublisherRef, SubscriptionManagerRef};
69use crate::region::supervisor::RegionSupervisorTickerRef;
70use crate::selector::{RegionStatAwareSelector, Selector, SelectorType};
71use crate::service::mailbox::MailboxRef;
72use crate::service::store::cached_kv::LeaderCachedKvBackend;
73use crate::state::{become_follower, become_leader, StateRef};
74
75pub const TABLE_ID_SEQ: &str = "table_id";
76pub const FLOW_ID_SEQ: &str = "flow_id";
77pub const METASRV_DATA_DIR: &str = "metasrv";
78
79#[derive(Clone, Debug, PartialEq, Serialize, Default, Deserialize, ValueEnum)]
81#[serde(rename_all = "snake_case")]
82pub enum BackendImpl {
83 #[default]
85 EtcdStore,
86 MemoryStore,
88 #[cfg(feature = "pg_kvbackend")]
89 PostgresStore,
91 #[cfg(feature = "mysql_kvbackend")]
92 MysqlStore,
94}
95
96#[derive(Clone, PartialEq, Serialize, Deserialize)]
97#[serde(default)]
98pub struct MetasrvOptions {
99 #[deprecated(note = "Use grpc.bind_addr instead")]
101 pub bind_addr: String,
102 #[deprecated(note = "Use grpc.server_addr instead")]
104 pub server_addr: String,
105 pub store_addrs: Vec<String>,
107 pub selector: SelectorType,
109 pub use_memory_store: bool,
111 pub enable_region_failover: bool,
113 #[serde(with = "humantime_serde")]
117 pub region_failure_detector_initialization_delay: Duration,
118 pub allow_region_failover_on_local_wal: bool,
123 pub grpc: GrpcOptions,
124 pub http: HttpOptions,
126 pub logging: LoggingOptions,
128 pub procedure: ProcedureConfig,
130 pub failure_detector: PhiAccrualFailureDetectorOptions,
132 pub datanode: DatanodeClientOptions,
134 pub enable_telemetry: bool,
136 pub data_home: String,
138 pub wal: MetasrvWalConfig,
140 pub export_metrics: ExportMetricsOption,
142 pub store_key_prefix: String,
145 pub max_txn_ops: usize,
156 pub flush_stats_factor: usize,
160 pub tracing: TracingOptions,
162 pub backend: BackendImpl,
164 #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
165 pub meta_table_name: String,
167 #[cfg(feature = "pg_kvbackend")]
168 pub meta_election_lock_id: u64,
170 #[serde(with = "humantime_serde")]
171 pub node_max_idle_time: Duration,
172}
173
174impl fmt::Debug for MetasrvOptions {
175 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
176 let mut debug_struct = f.debug_struct("MetasrvOptions");
177 debug_struct
178 .field("store_addrs", &self.sanitize_store_addrs())
179 .field("selector", &self.selector)
180 .field("use_memory_store", &self.use_memory_store)
181 .field("enable_region_failover", &self.enable_region_failover)
182 .field(
183 "allow_region_failover_on_local_wal",
184 &self.allow_region_failover_on_local_wal,
185 )
186 .field("grpc", &self.grpc)
187 .field("http", &self.http)
188 .field("logging", &self.logging)
189 .field("procedure", &self.procedure)
190 .field("failure_detector", &self.failure_detector)
191 .field("datanode", &self.datanode)
192 .field("enable_telemetry", &self.enable_telemetry)
193 .field("data_home", &self.data_home)
194 .field("wal", &self.wal)
195 .field("export_metrics", &self.export_metrics)
196 .field("store_key_prefix", &self.store_key_prefix)
197 .field("max_txn_ops", &self.max_txn_ops)
198 .field("flush_stats_factor", &self.flush_stats_factor)
199 .field("tracing", &self.tracing)
200 .field("backend", &self.backend);
201
202 #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
203 debug_struct.field("meta_table_name", &self.meta_table_name);
204
205 #[cfg(feature = "pg_kvbackend")]
206 debug_struct.field("meta_election_lock_id", &self.meta_election_lock_id);
207
208 debug_struct
209 .field("node_max_idle_time", &self.node_max_idle_time)
210 .finish()
211 }
212}
213
214const DEFAULT_METASRV_ADDR_PORT: &str = "3002";
215
216impl Default for MetasrvOptions {
217 fn default() -> Self {
218 Self {
219 #[allow(deprecated)]
220 bind_addr: String::new(),
221 #[allow(deprecated)]
222 server_addr: String::new(),
223 store_addrs: vec!["127.0.0.1:2379".to_string()],
224 selector: SelectorType::default(),
225 use_memory_store: false,
226 enable_region_failover: false,
227 region_failure_detector_initialization_delay: Duration::from_secs(10 * 60),
228 allow_region_failover_on_local_wal: false,
229 grpc: GrpcOptions {
230 bind_addr: format!("127.0.0.1:{}", DEFAULT_METASRV_ADDR_PORT),
231 ..Default::default()
232 },
233 http: HttpOptions::default(),
234 logging: LoggingOptions::default(),
235 procedure: ProcedureConfig {
236 max_retry_times: 12,
237 retry_delay: Duration::from_millis(500),
238 max_metadata_value_size: Some(ReadableSize::kb(1500)),
241 max_running_procedures: 128,
242 },
243 failure_detector: PhiAccrualFailureDetectorOptions::default(),
244 datanode: DatanodeClientOptions::default(),
245 enable_telemetry: true,
246 data_home: DEFAULT_DATA_HOME.to_string(),
247 wal: MetasrvWalConfig::default(),
248 export_metrics: ExportMetricsOption::default(),
249 store_key_prefix: String::new(),
250 max_txn_ops: 128,
251 flush_stats_factor: 3,
252 tracing: TracingOptions::default(),
253 backend: BackendImpl::EtcdStore,
254 #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
255 meta_table_name: common_meta::kv_backend::DEFAULT_META_TABLE_NAME.to_string(),
256 #[cfg(feature = "pg_kvbackend")]
257 meta_election_lock_id: common_meta::kv_backend::DEFAULT_META_ELECTION_LOCK_ID,
258 node_max_idle_time: Duration::from_secs(24 * 60 * 60),
259 }
260 }
261}
262
263impl Configurable for MetasrvOptions {
264 fn env_list_keys() -> Option<&'static [&'static str]> {
265 Some(&["wal.broker_endpoints", "store_addrs"])
266 }
267}
268
269impl MetasrvOptions {
270 fn sanitize_store_addrs(&self) -> Vec<String> {
271 self.store_addrs
272 .iter()
273 .map(|addr| common_meta::kv_backend::util::sanitize_connection_string(addr))
274 .collect()
275 }
276}
277
278pub struct MetasrvInfo {
279 pub server_addr: String,
280}
281#[derive(Clone)]
282pub struct Context {
283 pub server_addr: String,
284 pub in_memory: ResettableKvBackendRef,
285 pub kv_backend: KvBackendRef,
286 pub leader_cached_kv_backend: ResettableKvBackendRef,
287 pub meta_peer_client: MetaPeerClientRef,
288 pub mailbox: MailboxRef,
289 pub election: Option<ElectionRef>,
290 pub is_infancy: bool,
291 pub table_metadata_manager: TableMetadataManagerRef,
292 pub cache_invalidator: CacheInvalidatorRef,
293 pub leader_region_registry: LeaderRegionRegistryRef,
294}
295
296impl Context {
297 pub fn reset_in_memory(&self) {
298 self.in_memory.reset();
299 self.leader_region_registry.reset();
300 }
301}
302
303pub struct LeaderValue(pub String);
305
306impl<T: AsRef<[u8]>> From<T> for LeaderValue {
307 fn from(value: T) -> Self {
308 let string = String::from_utf8_lossy(value.as_ref());
309 Self(string.to_string())
310 }
311}
312
313#[derive(Debug, Clone, Serialize, Deserialize)]
314pub struct MetasrvNodeInfo {
315 pub addr: String,
317 pub version: String,
319 pub git_commit: String,
321 pub start_time_ms: u64,
323}
324
325impl From<MetasrvNodeInfo> for api::v1::meta::MetasrvNodeInfo {
326 fn from(node_info: MetasrvNodeInfo) -> Self {
327 Self {
328 peer: Some(api::v1::meta::Peer {
329 addr: node_info.addr,
330 ..Default::default()
331 }),
332 version: node_info.version,
333 git_commit: node_info.git_commit,
334 start_time_ms: node_info.start_time_ms,
335 }
336 }
337}
338
339#[derive(Clone, Copy)]
340pub enum SelectTarget {
341 Datanode,
342 Flownode,
343}
344
345impl Display for SelectTarget {
346 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
347 match self {
348 SelectTarget::Datanode => write!(f, "datanode"),
349 SelectTarget::Flownode => write!(f, "flownode"),
350 }
351 }
352}
353
354#[derive(Clone)]
355pub struct SelectorContext {
356 pub server_addr: String,
357 pub datanode_lease_secs: u64,
358 pub flownode_lease_secs: u64,
359 pub kv_backend: KvBackendRef,
360 pub meta_peer_client: MetaPeerClientRef,
361 pub table_id: Option<TableId>,
362}
363
364pub type SelectorRef = Arc<dyn Selector<Context = SelectorContext, Output = Vec<Peer>>>;
365pub type RegionStatAwareSelectorRef =
366 Arc<dyn RegionStatAwareSelector<Context = SelectorContext, Output = Vec<(RegionId, Peer)>>>;
367pub type ElectionRef = Arc<dyn Election<Leader = LeaderValue>>;
368
369pub struct MetaStateHandler {
370 subscribe_manager: Option<SubscriptionManagerRef>,
371 greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
372 leader_cached_kv_backend: Arc<LeaderCachedKvBackend>,
373 leadership_change_notifier: LeadershipChangeNotifier,
374 state: StateRef,
375}
376
377impl MetaStateHandler {
378 pub async fn on_leader_start(&self) {
379 self.state.write().unwrap().next_state(become_leader(false));
380
381 if let Err(e) = self.leader_cached_kv_backend.load().await {
382 error!(e; "Failed to load kv into leader cache kv store");
383 } else {
384 self.state.write().unwrap().next_state(become_leader(true));
385 }
386
387 self.leadership_change_notifier
388 .notify_on_leader_start()
389 .await;
390
391 self.greptimedb_telemetry_task.should_report(true);
392 }
393
394 pub async fn on_leader_stop(&self) {
395 self.state.write().unwrap().next_state(become_follower());
396
397 self.leadership_change_notifier
398 .notify_on_leader_stop()
399 .await;
400
401 self.greptimedb_telemetry_task.should_report(false);
403
404 if let Some(sub_manager) = self.subscribe_manager.clone() {
405 info!("Leader changed, un_subscribe all");
406 if let Err(e) = sub_manager.unsubscribe_all() {
407 error!(e; "Failed to un_subscribe all");
408 }
409 }
410 }
411}
412
413pub struct Metasrv {
414 state: StateRef,
415 started: Arc<AtomicBool>,
416 start_time_ms: u64,
417 options: MetasrvOptions,
418 in_memory: ResettableKvBackendRef,
421 kv_backend: KvBackendRef,
422 leader_cached_kv_backend: Arc<LeaderCachedKvBackend>,
423 meta_peer_client: MetaPeerClientRef,
424 selector: SelectorRef,
426 selector_ctx: SelectorContext,
427 flow_selector: SelectorRef,
429 handler_group: RwLock<Option<HeartbeatHandlerGroupRef>>,
430 handler_group_builder: Mutex<Option<HeartbeatHandlerGroupBuilder>>,
431 election: Option<ElectionRef>,
432 procedure_manager: ProcedureManagerRef,
433 mailbox: MailboxRef,
434 procedure_executor: ProcedureExecutorRef,
435 wal_options_allocator: WalOptionsAllocatorRef,
436 table_metadata_manager: TableMetadataManagerRef,
437 runtime_switch_manager: RuntimeSwitchManagerRef,
438 memory_region_keeper: MemoryRegionKeeperRef,
439 greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
440 region_migration_manager: RegionMigrationManagerRef,
441 region_supervisor_ticker: Option<RegionSupervisorTickerRef>,
442 cache_invalidator: CacheInvalidatorRef,
443 leader_region_registry: LeaderRegionRegistryRef,
444 wal_prune_ticker: Option<WalPruneTickerRef>,
445
446 plugins: Plugins,
447}
448
449impl Metasrv {
450 pub async fn try_start(&self) -> Result<()> {
451 if self
452 .started
453 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
454 .is_err()
455 {
456 warn!("Metasrv already started");
457 return Ok(());
458 }
459
460 let handler_group_builder =
461 self.handler_group_builder
462 .lock()
463 .unwrap()
464 .take()
465 .context(error::UnexpectedSnafu {
466 violated: "expected heartbeat handler group builder",
467 })?;
468 *self.handler_group.write().unwrap() = Some(Arc::new(handler_group_builder.build()?));
469
470 self.table_metadata_manager
472 .init()
473 .await
474 .context(InitMetadataSnafu)?;
475
476 if let Some(election) = self.election() {
477 let procedure_manager = self.procedure_manager.clone();
478 let in_memory = self.in_memory.clone();
479 let leader_cached_kv_backend = self.leader_cached_kv_backend.clone();
480 let subscribe_manager = self.subscription_manager();
481 let mut rx = election.subscribe_leader_change();
482 let greptimedb_telemetry_task = self.greptimedb_telemetry_task.clone();
483 greptimedb_telemetry_task
484 .start()
485 .context(StartTelemetryTaskSnafu)?;
486
487 let mut leadership_change_notifier = LeadershipChangeNotifier::default();
489 leadership_change_notifier.add_listener(self.wal_options_allocator.clone());
490 leadership_change_notifier
491 .add_listener(Arc::new(ProcedureManagerListenerAdapter(procedure_manager)));
492 leadership_change_notifier.add_listener(Arc::new(NodeExpiryListener::new(
493 self.options.node_max_idle_time,
494 self.in_memory.clone(),
495 )));
496 if let Some(region_supervisor_ticker) = &self.region_supervisor_ticker {
497 leadership_change_notifier.add_listener(region_supervisor_ticker.clone() as _);
498 }
499 if let Some(wal_prune_ticker) = &self.wal_prune_ticker {
500 leadership_change_notifier.add_listener(wal_prune_ticker.clone() as _);
501 }
502 if let Some(customizer) = self.plugins.get::<LeadershipChangeNotifierCustomizerRef>() {
503 customizer.customize(&mut leadership_change_notifier);
504 }
505
506 let state_handler = MetaStateHandler {
507 greptimedb_telemetry_task,
508 subscribe_manager,
509 state: self.state.clone(),
510 leader_cached_kv_backend: leader_cached_kv_backend.clone(),
511 leadership_change_notifier,
512 };
513 let _handle = common_runtime::spawn_global(async move {
514 loop {
515 match rx.recv().await {
516 Ok(msg) => {
517 in_memory.reset();
518 leader_cached_kv_backend.reset();
519 info!("Leader's cache has bean cleared on leader change: {msg}");
520 match msg {
521 LeaderChangeMessage::Elected(_) => {
522 state_handler.on_leader_start().await;
523 }
524 LeaderChangeMessage::StepDown(leader) => {
525 error!("Leader :{:?} step down", leader);
526
527 state_handler.on_leader_stop().await;
528 }
529 }
530 }
531 Err(RecvError::Closed) => {
532 error!("Not expected, is leader election loop still running?");
533 break;
534 }
535 Err(RecvError::Lagged(_)) => {
536 break;
537 }
538 }
539 }
540
541 state_handler.on_leader_stop().await;
542 });
543
544 {
546 let election = election.clone();
547 let started = self.started.clone();
548 let node_info = self.node_info();
549 let _handle = common_runtime::spawn_global(async move {
550 while started.load(Ordering::Acquire) {
551 let res = election.register_candidate(&node_info).await;
552 if let Err(e) = res {
553 warn!(e; "Metasrv register candidate error");
554 }
555 }
556 });
557 }
558
559 {
561 let election = election.clone();
562 let started = self.started.clone();
563 let _handle = common_runtime::spawn_global(async move {
564 while started.load(Ordering::Acquire) {
565 let res = election.campaign().await;
566 if let Err(e) = res {
567 warn!(e; "Metasrv election error");
568 }
569 election.reset_campaign().await;
570 info!("Metasrv re-initiate election");
571 }
572 info!("Metasrv stopped");
573 });
574 }
575 } else {
576 warn!(
577 "Ensure only one instance of Metasrv is running, as there is no election service."
578 );
579
580 if let Err(e) = self.wal_options_allocator.start().await {
581 error!(e; "Failed to start wal options allocator");
582 }
583 self.leader_cached_kv_backend
585 .load()
586 .await
587 .context(KvBackendSnafu)?;
588 self.procedure_manager
589 .start()
590 .await
591 .context(StartProcedureManagerSnafu)?;
592 }
593
594 info!("Metasrv started");
595
596 Ok(())
597 }
598
599 pub async fn shutdown(&self) -> Result<()> {
600 if self
601 .started
602 .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
603 .is_err()
604 {
605 warn!("Metasrv already stopped");
606 return Ok(());
607 }
608
609 self.procedure_manager
610 .stop()
611 .await
612 .context(StopProcedureManagerSnafu)?;
613
614 info!("Metasrv stopped");
615
616 Ok(())
617 }
618
619 pub fn start_time_ms(&self) -> u64 {
620 self.start_time_ms
621 }
622
623 pub fn node_info(&self) -> MetasrvNodeInfo {
624 let build_info = common_version::build_info();
625 MetasrvNodeInfo {
626 addr: self.options().grpc.server_addr.clone(),
627 version: build_info.version.to_string(),
628 git_commit: build_info.commit_short.to_string(),
629 start_time_ms: self.start_time_ms(),
630 }
631 }
632
633 pub(crate) async fn lookup_datanode_peer(&self, peer_id: u64) -> Result<Option<Peer>> {
636 lookup_datanode_peer(
637 peer_id,
638 &self.meta_peer_client,
639 distributed_time_constants::DATANODE_LEASE_SECS,
640 )
641 .await
642 }
643
644 pub fn options(&self) -> &MetasrvOptions {
645 &self.options
646 }
647
648 pub fn in_memory(&self) -> &ResettableKvBackendRef {
649 &self.in_memory
650 }
651
652 pub fn kv_backend(&self) -> &KvBackendRef {
653 &self.kv_backend
654 }
655
656 pub fn meta_peer_client(&self) -> &MetaPeerClientRef {
657 &self.meta_peer_client
658 }
659
660 pub fn selector(&self) -> &SelectorRef {
661 &self.selector
662 }
663
664 pub fn selector_ctx(&self) -> &SelectorContext {
665 &self.selector_ctx
666 }
667
668 pub fn flow_selector(&self) -> &SelectorRef {
669 &self.flow_selector
670 }
671
672 pub fn handler_group(&self) -> Option<HeartbeatHandlerGroupRef> {
673 self.handler_group.read().unwrap().clone()
674 }
675
676 pub fn election(&self) -> Option<&ElectionRef> {
677 self.election.as_ref()
678 }
679
680 pub fn mailbox(&self) -> &MailboxRef {
681 &self.mailbox
682 }
683
684 pub fn procedure_executor(&self) -> &ProcedureExecutorRef {
685 &self.procedure_executor
686 }
687
688 pub fn procedure_manager(&self) -> &ProcedureManagerRef {
689 &self.procedure_manager
690 }
691
692 pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
693 &self.table_metadata_manager
694 }
695
696 pub fn runtime_switch_manager(&self) -> &RuntimeSwitchManagerRef {
697 &self.runtime_switch_manager
698 }
699
700 pub fn memory_region_keeper(&self) -> &MemoryRegionKeeperRef {
701 &self.memory_region_keeper
702 }
703
704 pub fn region_migration_manager(&self) -> &RegionMigrationManagerRef {
705 &self.region_migration_manager
706 }
707
708 pub fn publish(&self) -> Option<PublisherRef> {
709 self.plugins.get::<PublisherRef>()
710 }
711
712 pub fn subscription_manager(&self) -> Option<SubscriptionManagerRef> {
713 self.plugins.get::<SubscriptionManagerRef>()
714 }
715
716 pub fn plugins(&self) -> &Plugins {
717 &self.plugins
718 }
719
720 #[inline]
721 pub fn new_ctx(&self) -> Context {
722 let server_addr = self.options().grpc.server_addr.clone();
723 let in_memory = self.in_memory.clone();
724 let kv_backend = self.kv_backend.clone();
725 let leader_cached_kv_backend = self.leader_cached_kv_backend.clone();
726 let meta_peer_client = self.meta_peer_client.clone();
727 let mailbox = self.mailbox.clone();
728 let election = self.election.clone();
729 let table_metadata_manager = self.table_metadata_manager.clone();
730 let cache_invalidator = self.cache_invalidator.clone();
731 let leader_region_registry = self.leader_region_registry.clone();
732
733 Context {
734 server_addr,
735 in_memory,
736 kv_backend,
737 leader_cached_kv_backend,
738 meta_peer_client,
739 mailbox,
740 election,
741 is_infancy: false,
742 table_metadata_manager,
743 cache_invalidator,
744 leader_region_registry,
745 }
746 }
747}