1#![feature(assert_matches)]
16
17use async_trait::async_trait;
18use common_error::ext::ErrorExt;
19use common_error::status_code::StatusCode;
20use common_mem_prof::activate_heap_profile;
21use common_stat::{get_total_cpu_millicores, get_total_memory_bytes};
22use common_telemetry::{error, info, warn};
23
24use crate::error::Result;
25
26pub mod cli;
27pub mod datanode;
28pub mod error;
29pub mod flownode;
30pub mod frontend;
31pub mod metasrv;
32pub mod options;
33pub mod standalone;
34
35lazy_static::lazy_static! {
36 static ref APP_VERSION: prometheus::IntGaugeVec =
37 prometheus::register_int_gauge_vec!("greptime_app_version", "app version", &["version", "short_version", "app"]).unwrap();
38
39 static ref CPU_LIMIT: prometheus::IntGaugeVec =
40 prometheus::register_int_gauge_vec!("greptime_cpu_limit_in_millicores", "cpu limit in millicores", &["app"]).unwrap();
41
42 static ref MEMORY_LIMIT: prometheus::IntGaugeVec =
43 prometheus::register_int_gauge_vec!("greptime_memory_limit_in_bytes", "memory limit in bytes", &["app"]).unwrap();
44}
45
46#[cfg(unix)]
48async fn start_wait_for_close_signal() -> std::io::Result<()> {
49 use tokio::signal::unix::{SignalKind, signal};
50 let mut sigint = signal(SignalKind::interrupt())?;
51 let mut sigterm = signal(SignalKind::terminate())?;
52
53 tokio::select! {
54 _ = sigint.recv() => {
55 info!("Received SIGINT, shutting down");
56 }
57 _ = sigterm.recv() => {
58 info!("Received SIGTERM, shutting down");
59 }
60 }
61
62 Ok(())
63}
64
65#[cfg(not(unix))]
67async fn start_wait_for_close_signal() -> std::io::Result<()> {
68 tokio::signal::ctrl_c().await
69}
70
71#[async_trait]
72pub trait App: Send {
73 fn name(&self) -> &str;
74
75 async fn pre_start(&mut self) -> Result<()> {
77 Ok(())
78 }
79
80 async fn start(&mut self) -> Result<()>;
81
82 fn wait_signal(&self) -> bool {
84 true
85 }
86
87 async fn stop(&mut self) -> Result<()>;
88
89 async fn run(&mut self) -> Result<()> {
90 info!("Starting app: {}", self.name());
91
92 self.pre_start().await?;
93
94 self.start().await?;
95
96 if self.wait_signal()
97 && let Err(e) = start_wait_for_close_signal().await
98 {
99 error!(e; "Failed to listen for close signal");
100 }
104
105 self.stop().await?;
106 info!("Goodbye!");
107 Ok(())
108 }
109}
110
111pub fn log_versions(version: &str, short_version: &str, app: &str) {
116 APP_VERSION
118 .with_label_values(&[common_version::version(), short_version, app])
119 .inc();
120
121 info!("GreptimeDB version: {}", version);
123
124 log_env_flags();
125}
126
127pub fn create_resource_limit_metrics(app: &str) {
128 let cpu_limit = get_total_cpu_millicores();
129 if cpu_limit > 0 {
130 info!(
131 "GreptimeDB start with cpu limit in millicores: {}",
132 cpu_limit
133 );
134 CPU_LIMIT.with_label_values(&[app]).set(cpu_limit);
135 }
136
137 let memory_limit = get_total_memory_bytes();
138 if memory_limit > 0 {
139 info!(
140 "GreptimeDB start with memory limit in bytes: {}",
141 memory_limit
142 );
143 MEMORY_LIMIT.with_label_values(&[app]).set(memory_limit);
144 }
145}
146
147fn log_env_flags() {
148 info!("command line arguments");
149 for argument in std::env::args() {
150 info!("argument: {}", argument);
151 }
152}
153
154pub fn maybe_activate_heap_profile(memory_options: &common_options::memory::MemoryOptions) {
155 if memory_options.enable_heap_profiling {
156 match activate_heap_profile() {
157 Ok(()) => {
158 info!("Heap profile is active");
159 }
160 Err(err) => {
161 if err.status_code() == StatusCode::Unsupported {
162 info!("Heap profile is not supported");
163 } else {
164 warn!(err; "Failed to activate heap profile");
165 }
166 }
167 }
168 }
169}