cmd/
flownode.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::path::Path;
16use std::sync::Arc;
17use std::time::Duration;
18
19use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
20use catalog::information_extension::DistributedInformationExtension;
21use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend};
22use clap::Parser;
23use client::client_manager::NodeClients;
24use common_base::Plugins;
25use common_config::{Configurable, DEFAULT_DATA_HOME};
26use common_grpc::channel_manager::ChannelConfig;
27use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
28use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
29use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
30use common_meta::heartbeat::handler::HandlerGroupExecutor;
31use common_meta::key::flow::FlowMetadataManager;
32use common_meta::key::TableMetadataManager;
33use common_telemetry::info;
34use common_telemetry::logging::{TracingOptions, DEFAULT_LOGGING_DIR};
35use common_version::{short_version, version};
36use flow::{
37    get_flow_auth_options, FlownodeBuilder, FlownodeInstance, FlownodeServiceBuilder,
38    FrontendClient, FrontendInvoker,
39};
40use meta_client::{MetaClientOptions, MetaClientType};
41use snafu::{ensure, OptionExt, ResultExt};
42use tracing_appender::non_blocking::WorkerGuard;
43
44use crate::error::{
45    BuildCacheRegistrySnafu, InitMetadataSnafu, LoadLayeredConfigSnafu, MetaClientInitSnafu,
46    MissingConfigSnafu, Result, ShutdownFlownodeSnafu, StartFlownodeSnafu,
47};
48use crate::options::{GlobalOptions, GreptimeOptions};
49use crate::{create_resource_limit_metrics, log_versions, App};
50
51pub const APP_NAME: &str = "greptime-flownode";
52
53type FlownodeOptions = GreptimeOptions<flow::FlownodeOptions>;
54
55pub struct Instance {
56    flownode: FlownodeInstance,
57
58    // The components of flownode, which make it easier to expand based
59    // on the components.
60    #[cfg(feature = "enterprise")]
61    components: Components,
62
63    // Keep the logging guard to prevent the worker from being dropped.
64    _guard: Vec<WorkerGuard>,
65}
66
67#[cfg(feature = "enterprise")]
68pub struct Components {
69    pub catalog_manager: catalog::CatalogManagerRef,
70    pub fe_client: Arc<FrontendClient>,
71    pub kv_backend: common_meta::kv_backend::KvBackendRef,
72}
73
74impl Instance {
75    pub fn new(
76        flownode: FlownodeInstance,
77        #[cfg(feature = "enterprise")] components: Components,
78        guard: Vec<WorkerGuard>,
79    ) -> Self {
80        Self {
81            flownode,
82            #[cfg(feature = "enterprise")]
83            components,
84            _guard: guard,
85        }
86    }
87
88    pub fn flownode(&self) -> &FlownodeInstance {
89        &self.flownode
90    }
91
92    /// allow customizing flownode for downstream projects
93    pub fn flownode_mut(&mut self) -> &mut FlownodeInstance {
94        &mut self.flownode
95    }
96
97    #[cfg(feature = "enterprise")]
98    pub fn components(&self) -> &Components {
99        &self.components
100    }
101}
102
103#[async_trait::async_trait]
104impl App for Instance {
105    fn name(&self) -> &str {
106        APP_NAME
107    }
108
109    async fn start(&mut self) -> Result<()> {
110        plugins::start_flownode_plugins(self.flownode.flow_engine().plugins().clone())
111            .await
112            .context(StartFlownodeSnafu)?;
113
114        self.flownode.start().await.context(StartFlownodeSnafu)
115    }
116
117    async fn stop(&mut self) -> Result<()> {
118        self.flownode
119            .shutdown()
120            .await
121            .context(ShutdownFlownodeSnafu)
122    }
123}
124
125#[derive(Parser)]
126pub struct Command {
127    #[clap(subcommand)]
128    subcmd: SubCommand,
129}
130
131impl Command {
132    pub async fn build(&self, opts: FlownodeOptions) -> Result<Instance> {
133        self.subcmd.build(opts).await
134    }
135
136    pub fn load_options(&self, global_options: &GlobalOptions) -> Result<FlownodeOptions> {
137        match &self.subcmd {
138            SubCommand::Start(cmd) => cmd.load_options(global_options),
139        }
140    }
141}
142
143#[derive(Parser)]
144enum SubCommand {
145    Start(StartCommand),
146}
147
148impl SubCommand {
149    async fn build(&self, opts: FlownodeOptions) -> Result<Instance> {
150        match self {
151            SubCommand::Start(cmd) => cmd.build(opts).await,
152        }
153    }
154}
155
156#[derive(Debug, Parser, Default)]
157struct StartCommand {
158    /// Flownode's id
159    #[clap(long)]
160    node_id: Option<u64>,
161    /// Bind address for the gRPC server.
162    #[clap(long, alias = "rpc-addr")]
163    rpc_bind_addr: Option<String>,
164    /// The address advertised to the metasrv, and used for connections from outside the host.
165    /// If left empty or unset, the server will automatically use the IP address of the first network interface
166    /// on the host, with the same port number as the one specified in `rpc_bind_addr`.
167    #[clap(long, alias = "rpc-hostname")]
168    rpc_server_addr: Option<String>,
169    /// Metasrv address list;
170    #[clap(long, value_delimiter = ',', num_args = 1..)]
171    metasrv_addrs: Option<Vec<String>>,
172    /// The configuration file for flownode
173    #[clap(short, long)]
174    config_file: Option<String>,
175    /// The prefix of environment variables, default is `GREPTIMEDB_FLOWNODE`;
176    #[clap(long, default_value = "GREPTIMEDB_FLOWNODE")]
177    env_prefix: String,
178    #[clap(long)]
179    http_addr: Option<String>,
180    /// HTTP request timeout in seconds.
181    #[clap(long)]
182    http_timeout: Option<u64>,
183    /// User Provider cfg, for auth, currently only support static user provider
184    #[clap(long)]
185    user_provider: Option<String>,
186}
187
188impl StartCommand {
189    fn load_options(&self, global_options: &GlobalOptions) -> Result<FlownodeOptions> {
190        let mut opts = FlownodeOptions::load_layered_options(
191            self.config_file.as_deref(),
192            self.env_prefix.as_ref(),
193        )
194        .context(LoadLayeredConfigSnafu)?;
195
196        self.merge_with_cli_options(global_options, &mut opts)?;
197
198        Ok(opts)
199    }
200
201    // The precedence order is: cli > config file > environment variables > default values.
202    fn merge_with_cli_options(
203        &self,
204        global_options: &GlobalOptions,
205        opts: &mut FlownodeOptions,
206    ) -> Result<()> {
207        let opts = &mut opts.component;
208
209        if let Some(dir) = &global_options.log_dir {
210            opts.logging.dir.clone_from(dir);
211        }
212
213        // If the logging dir is not set, use the default logs dir in the data home.
214        if opts.logging.dir.is_empty() {
215            opts.logging.dir = Path::new(DEFAULT_DATA_HOME)
216                .join(DEFAULT_LOGGING_DIR)
217                .to_string_lossy()
218                .to_string();
219        }
220
221        if global_options.log_level.is_some() {
222            opts.logging.level.clone_from(&global_options.log_level);
223        }
224
225        opts.tracing = TracingOptions {
226            #[cfg(feature = "tokio-console")]
227            tokio_console_addr: global_options.tokio_console_addr.clone(),
228        };
229
230        if let Some(addr) = &self.rpc_bind_addr {
231            opts.grpc.bind_addr.clone_from(addr);
232        }
233
234        if let Some(server_addr) = &self.rpc_server_addr {
235            opts.grpc.server_addr.clone_from(server_addr);
236        }
237
238        if let Some(node_id) = self.node_id {
239            opts.node_id = Some(node_id);
240        }
241
242        if let Some(metasrv_addrs) = &self.metasrv_addrs {
243            opts.meta_client
244                .get_or_insert_with(MetaClientOptions::default)
245                .metasrv_addrs
246                .clone_from(metasrv_addrs);
247        }
248
249        if let Some(http_addr) = &self.http_addr {
250            opts.http.addr.clone_from(http_addr);
251        }
252
253        if let Some(http_timeout) = self.http_timeout {
254            opts.http.timeout = Duration::from_secs(http_timeout);
255        }
256
257        if let Some(user_provider) = &self.user_provider {
258            opts.user_provider = Some(user_provider.clone());
259        }
260
261        ensure!(
262            opts.node_id.is_some(),
263            MissingConfigSnafu {
264                msg: "Missing node id option"
265            }
266        );
267
268        Ok(())
269    }
270
271    async fn build(&self, opts: FlownodeOptions) -> Result<Instance> {
272        common_runtime::init_global_runtimes(&opts.runtime);
273
274        let guard = common_telemetry::init_global_logging(
275            APP_NAME,
276            &opts.component.logging,
277            &opts.component.tracing,
278            opts.component.node_id.map(|x| x.to_string()),
279            None,
280        );
281
282        log_versions(version(), short_version(), APP_NAME);
283        create_resource_limit_metrics(APP_NAME);
284
285        info!("Flownode start command: {:#?}", self);
286        info!("Flownode options: {:#?}", opts);
287
288        let plugin_opts = opts.plugins;
289        let mut opts = opts.component;
290        opts.grpc.detect_server_addr();
291
292        let mut plugins = Plugins::new();
293        plugins::setup_flownode_plugins(&mut plugins, &plugin_opts, &opts)
294            .await
295            .context(StartFlownodeSnafu)?;
296
297        let member_id = opts
298            .node_id
299            .context(MissingConfigSnafu { msg: "'node_id'" })?;
300
301        let meta_config = opts.meta_client.as_ref().context(MissingConfigSnafu {
302            msg: "'meta_client_options'",
303        })?;
304
305        let meta_client = meta_client::create_meta_client(
306            MetaClientType::Flownode { member_id },
307            meta_config,
308            None,
309            None,
310        )
311        .await
312        .context(MetaClientInitSnafu)?;
313
314        let cache_max_capacity = meta_config.metadata_cache_max_capacity;
315        let cache_ttl = meta_config.metadata_cache_ttl;
316        let cache_tti = meta_config.metadata_cache_tti;
317
318        // TODO(discord9): add helper function to ease the creation of cache registry&such
319        let cached_meta_backend =
320            CachedKvBackendBuilder::new(Arc::new(MetaKvBackend::new(meta_client.clone())))
321                .cache_max_capacity(cache_max_capacity)
322                .cache_ttl(cache_ttl)
323                .cache_tti(cache_tti)
324                .build();
325        let cached_meta_backend = Arc::new(cached_meta_backend);
326
327        // Builds cache registry
328        let layered_cache_builder = LayeredCacheRegistryBuilder::default().add_cache_registry(
329            CacheRegistryBuilder::default()
330                .add_cache(cached_meta_backend.clone())
331                .build(),
332        );
333        let fundamental_cache_registry =
334            build_fundamental_cache_registry(Arc::new(MetaKvBackend::new(meta_client.clone())));
335        let layered_cache_registry = Arc::new(
336            with_default_composite_cache_registry(
337                layered_cache_builder.add_cache_registry(fundamental_cache_registry),
338            )
339            .context(BuildCacheRegistrySnafu)?
340            .build(),
341        );
342
343        let information_extension =
344            Arc::new(DistributedInformationExtension::new(meta_client.clone()));
345        let catalog_manager = KvBackendCatalogManager::new(
346            information_extension,
347            cached_meta_backend.clone(),
348            layered_cache_registry.clone(),
349            None,
350            None,
351        );
352
353        let table_metadata_manager =
354            Arc::new(TableMetadataManager::new(cached_meta_backend.clone()));
355        table_metadata_manager
356            .init()
357            .await
358            .context(InitMetadataSnafu)?;
359
360        let executor = HandlerGroupExecutor::new(vec![
361            Arc::new(ParseMailboxMessageHandler),
362            Arc::new(InvalidateCacheHandler::new(layered_cache_registry.clone())),
363        ]);
364
365        let heartbeat_task = flow::heartbeat::HeartbeatTask::new(
366            &opts,
367            meta_client.clone(),
368            opts.heartbeat.clone(),
369            Arc::new(executor),
370        );
371
372        let flow_metadata_manager = Arc::new(FlowMetadataManager::new(cached_meta_backend.clone()));
373        let flow_auth_header = get_flow_auth_options(&opts).context(StartFlownodeSnafu)?;
374        let frontend_client = FrontendClient::from_meta_client(
375            meta_client.clone(),
376            flow_auth_header,
377            opts.query.clone(),
378        );
379        let frontend_client = Arc::new(frontend_client);
380        let flownode_builder = FlownodeBuilder::new(
381            opts.clone(),
382            plugins,
383            table_metadata_manager,
384            catalog_manager.clone(),
385            flow_metadata_manager,
386            frontend_client.clone(),
387        )
388        .with_heartbeat_task(heartbeat_task);
389
390        let mut flownode = flownode_builder.build().await.context(StartFlownodeSnafu)?;
391        let services = FlownodeServiceBuilder::new(&opts)
392            .with_default_grpc_server(flownode.flownode_server())
393            .enable_http_service()
394            .build()
395            .context(StartFlownodeSnafu)?;
396        flownode.setup_services(services);
397        let flownode = flownode;
398
399        // flownode's frontend to datanode need not timeout.
400        // Some queries are expected to take long time.
401        let channel_config = ChannelConfig {
402            timeout: None,
403            ..Default::default()
404        };
405        let client = Arc::new(NodeClients::new(channel_config));
406
407        let invoker = FrontendInvoker::build_from(
408            flownode.flow_engine().streaming_engine(),
409            catalog_manager.clone(),
410            cached_meta_backend.clone(),
411            layered_cache_registry.clone(),
412            meta_client.clone(),
413            client,
414        )
415        .await
416        .context(StartFlownodeSnafu)?;
417        flownode
418            .flow_engine()
419            .streaming_engine()
420            // TODO(discord9): refactor and avoid circular reference
421            .set_frontend_invoker(invoker)
422            .await;
423
424        #[cfg(feature = "enterprise")]
425        let components = Components {
426            catalog_manager: catalog_manager.clone(),
427            fe_client: frontend_client,
428            kv_backend: cached_meta_backend,
429        };
430
431        #[cfg(not(feature = "enterprise"))]
432        return Ok(Instance::new(flownode, guard));
433        #[cfg(feature = "enterprise")]
434        Ok(Instance::new(flownode, components, guard))
435    }
436}