frontend/instance/
log_handler.rs1use std::sync::Arc;
16
17use api::v1::RowInsertRequests;
18use async_trait::async_trait;
19use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
20use client::Output;
21use common_error::ext::BoxedError;
22use datatypes::timestamp::TimestampNanosecond;
23use pipeline::pipeline_operator::PipelineOperator;
24use pipeline::{Pipeline, PipelineInfo, PipelineVersion};
25use servers::error::{
26 AuthSnafu, Error as ServerError, ExecuteGrpcRequestSnafu, PipelineSnafu, Result as ServerResult,
27};
28use servers::interceptor::{LogIngestInterceptor, LogIngestInterceptorRef};
29use servers::query_handler::PipelineHandler;
30use session::context::{QueryContext, QueryContextRef};
31use snafu::ResultExt;
32use table::Table;
33
34use crate::instance::Instance;
35
36#[async_trait]
37impl PipelineHandler for Instance {
38 async fn insert(&self, log: RowInsertRequests, ctx: QueryContextRef) -> ServerResult<Output> {
39 self.plugins
40 .get::<PermissionCheckerRef>()
41 .as_ref()
42 .check_permission(ctx.current_user(), PermissionReq::LogWrite)
43 .context(AuthSnafu)?;
44
45 let log = self
46 .plugins
47 .get::<LogIngestInterceptorRef<ServerError>>()
48 .as_ref()
49 .pre_ingest(log, ctx.clone())?;
50
51 self.handle_log_inserts(log, ctx).await
52 }
53
54 async fn get_pipeline(
55 &self,
56 name: &str,
57 version: PipelineVersion,
58 query_ctx: QueryContextRef,
59 ) -> ServerResult<Arc<Pipeline>> {
60 self.pipeline_operator
61 .get_pipeline(query_ctx, name, version)
62 .await
63 .context(PipelineSnafu)
64 }
65
66 async fn insert_pipeline(
67 &self,
68 name: &str,
69 content_type: &str,
70 pipeline: &str,
71 query_ctx: QueryContextRef,
72 ) -> ServerResult<PipelineInfo> {
73 self.pipeline_operator
74 .insert_pipeline(name, content_type, pipeline, query_ctx)
75 .await
76 .context(PipelineSnafu)
77 }
78
79 async fn delete_pipeline(
80 &self,
81 name: &str,
82 version: PipelineVersion,
83 ctx: QueryContextRef,
84 ) -> ServerResult<Option<()>> {
85 self.pipeline_operator
86 .delete_pipeline(name, version, ctx)
87 .await
88 .context(PipelineSnafu)
89 }
90
91 async fn get_table(
92 &self,
93 table: &str,
94 query_ctx: &QueryContext,
95 ) -> std::result::Result<Option<Arc<Table>>, catalog::error::Error> {
96 let catalog = query_ctx.current_catalog();
97 let schema = query_ctx.current_schema();
98 self.catalog_manager
99 .table(catalog, &schema, table, None)
100 .await
101 }
102
103 fn build_pipeline(&self, pipeline: &str) -> ServerResult<Pipeline> {
104 PipelineOperator::build_pipeline(pipeline).context(PipelineSnafu)
105 }
106
107 async fn get_pipeline_str(
108 &self,
109 name: &str,
110 version: PipelineVersion,
111 query_ctx: QueryContextRef,
112 ) -> ServerResult<(String, TimestampNanosecond)> {
113 self.pipeline_operator
114 .get_pipeline_str(name, version, query_ctx)
115 .await
116 .context(PipelineSnafu)
117 }
118}
119
120impl Instance {
121 pub async fn handle_log_inserts(
122 &self,
123 log: RowInsertRequests,
124 ctx: QueryContextRef,
125 ) -> ServerResult<Output> {
126 self.inserter
127 .handle_log_inserts(log, ctx, self.statement_executor.as_ref())
128 .await
129 .map_err(BoxedError::new)
130 .context(ExecuteGrpcRequestSnafu)
131 }
132
133 pub async fn handle_trace_inserts(
134 &self,
135 rows: RowInsertRequests,
136 ctx: QueryContextRef,
137 ) -> ServerResult<Output> {
138 self.inserter
139 .handle_trace_inserts(rows, ctx, self.statement_executor.as_ref())
140 .await
141 .map_err(BoxedError::new)
142 .context(ExecuteGrpcRequestSnafu)
143 }
144}