@@ -18,6 +18,7 @@ package client
18
18
19
19
import (
20
20
"bytes"
21
+ "context"
21
22
"encoding/json"
22
23
"errors"
23
24
"fmt"
@@ -38,6 +39,7 @@ import (
38
39
type EventClientConfig struct {
39
40
DestinationURL string `env:"EVENT_URL" envDefault:"http://localhost:3000/notify" description:"Notifier service url"`
40
41
NotificationMedium NotificationMedium `env:"NOTIFICATION_MEDIUM" envDefault:"rest" description:"notification medium"`
42
+ EnableNotifierV2 bool `env:"ENABLE_NOTIFIER_V2" envDefault:"false" description:"enable notifier v2"`
41
43
}
42
44
type NotificationMedium string
43
45
@@ -58,24 +60,25 @@ type EventClient interface {
58
60
}
59
61
60
62
type Event struct {
61
- EventTypeId int `json:"eventTypeId"`
62
- EventName string `json:"eventName"`
63
- PipelineId int `json:"pipelineId"`
64
- PipelineType string `json:"pipelineType"`
65
- CorrelationId string `json:"correlationId"`
66
- Payload * Payload `json:"payload"`
67
- EventTime string `json:"eventTime"`
68
- TeamId int `json:"teamId"`
69
- AppId int `json:"appId"`
70
- EnvId int `json:"envId"`
71
- IsProdEnv bool `json:"isProdEnv"`
72
- ClusterId int `json:"clusterId"`
73
- CdWorkflowType bean.WorkflowType `json:"cdWorkflowType,omitempty"`
74
- CdWorkflowRunnerId int `json:"cdWorkflowRunnerId"`
75
- CiWorkflowRunnerId int `json:"ciWorkflowRunnerId"`
76
- CiArtifactId int `json:"ciArtifactId"`
77
- BaseUrl string `json:"baseUrl"`
78
- UserId int `json:"-"`
63
+ EventTypeId int `json:"eventTypeId"`
64
+ EventName string `json:"eventName"`
65
+ PipelineId int `json:"pipelineId"`
66
+ PipelineType string `json:"pipelineType"`
67
+ CorrelationId string `json:"correlationId"`
68
+ Payload * Payload `json:"payload"`
69
+ EventTime string `json:"eventTime"`
70
+ TeamId int `json:"teamId"`
71
+ AppId int `json:"appId"`
72
+ EnvId int `json:"envId"`
73
+ IsProdEnv bool `json:"isProdEnv"`
74
+ ClusterId int `json:"clusterId"`
75
+ CdWorkflowType bean.WorkflowType `json:"cdWorkflowType,omitempty"`
76
+ CdWorkflowRunnerId int `json:"cdWorkflowRunnerId"`
77
+ CiWorkflowRunnerId int `json:"ciWorkflowRunnerId"`
78
+ CiArtifactId int `json:"ciArtifactId"`
79
+ EnvIdsForCiPipeline []int `json:"envIdsForCiPipeline"`
80
+ BaseUrl string `json:"baseUrl"`
81
+ UserId int `json:"-"`
79
82
}
80
83
81
84
type Payload struct {
@@ -95,22 +98,25 @@ type Payload struct {
95
98
}
96
99
97
100
type EventRESTClientImpl struct {
98
- logger * zap.SugaredLogger
99
- client * http.Client
100
- config * EventClientConfig
101
- pubsubClient * pubsub.PubSubClientServiceImpl
102
- ciPipelineRepository pipelineConfig.CiPipelineRepository
103
- pipelineRepository pipelineConfig.PipelineRepository
104
- attributesRepository repository.AttributesRepository
105
- moduleService module.ModuleService
101
+ logger * zap.SugaredLogger
102
+ client * http.Client
103
+ config * EventClientConfig
104
+ pubsubClient * pubsub.PubSubClientServiceImpl
105
+ ciPipelineRepository pipelineConfig.CiPipelineRepository
106
+ pipelineRepository pipelineConfig.PipelineRepository
107
+ attributesRepository repository.AttributesRepository
108
+ moduleService module.ModuleService
109
+ notificationSettingsRepository repository.NotificationSettingsRepository
106
110
}
107
111
108
112
func NewEventRESTClientImpl (logger * zap.SugaredLogger , client * http.Client , config * EventClientConfig , pubsubClient * pubsub.PubSubClientServiceImpl ,
109
113
ciPipelineRepository pipelineConfig.CiPipelineRepository , pipelineRepository pipelineConfig.PipelineRepository ,
110
- attributesRepository repository.AttributesRepository , moduleService module.ModuleService ) * EventRESTClientImpl {
114
+ attributesRepository repository.AttributesRepository , moduleService module.ModuleService ,
115
+ notificationSettingsRepository repository.NotificationSettingsRepository ) * EventRESTClientImpl {
111
116
return & EventRESTClientImpl {logger : logger , client : client , config : config , pubsubClient : pubsubClient ,
112
117
ciPipelineRepository : ciPipelineRepository , pipelineRepository : pipelineRepository ,
113
- attributesRepository : attributesRepository , moduleService : moduleService }
118
+ attributesRepository : attributesRepository , moduleService : moduleService ,
119
+ notificationSettingsRepository : notificationSettingsRepository }
114
120
}
115
121
116
122
func (impl * EventRESTClientImpl ) buildFinalPayload (event Event , cdPipeline * pipelineConfig.Pipeline , ciPipeline * pipelineConfig.CiPipeline ) * Payload {
@@ -235,34 +241,131 @@ func (impl *EventRESTClientImpl) sendEventsOnNats(body []byte) error {
235
241
// do not call this method if notification module is not installed
236
242
func (impl * EventRESTClientImpl ) sendEvent (event Event ) (bool , error ) {
237
243
impl .logger .Debugw ("event before send" , "event" , event )
238
- body , err := json .Marshal (event )
244
+
245
+ // Step 1: Create payload and destination URL based on config
246
+ bodyBytes , destinationUrl , err := impl .createPayloadAndDestination (event )
239
247
if err != nil {
240
- impl .logger .Errorw ("error while marshaling event request " , "err" , err )
241
248
return false , err
242
249
}
250
+
251
+ // Step 2: Send via appropriate medium (NATS or REST)
252
+ return impl .deliverEvent (bodyBytes , destinationUrl )
253
+ }
254
+
255
+ func (impl * EventRESTClientImpl ) createPayloadAndDestination (event Event ) ([]byte , string , error ) {
256
+ if impl .config .EnableNotifierV2 {
257
+ return impl .createV2PayloadAndDestination (event )
258
+ }
259
+ return impl .createDefaultPayloadAndDestination (event )
260
+ }
261
+
262
+ func (impl * EventRESTClientImpl ) createV2PayloadAndDestination (event Event ) ([]byte , string , error ) {
263
+ destinationUrl := impl .config .DestinationURL + "/v2"
264
+
265
+ // Fetch notification settings
266
+ req := repository.GetRulesRequest {
267
+ TeamId : event .TeamId ,
268
+ EnvId : event .EnvId ,
269
+ AppId : event .AppId ,
270
+ PipelineId : event .PipelineId ,
271
+ PipelineType : event .PipelineType ,
272
+ IsProdEnv : & event .IsProdEnv ,
273
+ ClusterId : event .ClusterId ,
274
+ EnvIdsForCiPipeline : event .EnvIdsForCiPipeline ,
275
+ }
276
+ notificationSettings , err := impl .notificationSettingsRepository .FindNotificationSettingsWithRules (
277
+ context .Background (), event .EventTypeId , req ,
278
+ )
279
+ if err != nil {
280
+ impl .logger .Errorw ("error while fetching notification settings" , "err" , err )
281
+ return nil , "" , err
282
+ }
283
+
284
+ // Process notification settings into beans
285
+ notificationSettingsBean , err := impl .processNotificationSettings (notificationSettings )
286
+ if err != nil {
287
+ return nil , "" , err
288
+ }
289
+
290
+ // Create combined payload
291
+ combinedPayload := map [string ]interface {}{
292
+ "event" : event ,
293
+ "notificationSettings" : notificationSettingsBean ,
294
+ }
295
+
296
+ bodyBytes , err := json .Marshal (combinedPayload )
297
+ if err != nil {
298
+ impl .logger .Errorw ("error while marshaling combined event request" , "err" , err )
299
+ return nil , "" , err
300
+ }
301
+
302
+ return bodyBytes , destinationUrl , nil
303
+ }
304
+
305
+ func (impl * EventRESTClientImpl ) createDefaultPayloadAndDestination (event Event ) ([]byte , string , error ) {
306
+ bodyBytes , err := json .Marshal (event )
307
+ if err != nil {
308
+ impl .logger .Errorw ("error while marshaling event request" , "err" , err )
309
+ return nil , "" , err
310
+ }
311
+ return bodyBytes , impl .config .DestinationURL , nil
312
+ }
313
+
314
+ func (impl * EventRESTClientImpl ) processNotificationSettings (notificationSettings []repository.NotificationSettings ) ([]* repository.NotificationSettingsBean , error ) {
315
+ notificationSettingsBean := make ([]* repository.NotificationSettingsBean , 0 )
316
+ for _ , item := range notificationSettings {
317
+ config := make ([]repository.ConfigEntry , 0 )
318
+ if item .Config != "" {
319
+ if err := json .Unmarshal ([]byte (item .Config ), & config ); err != nil {
320
+ impl .logger .Errorw ("error while unmarshaling config" , "err" , err )
321
+ return nil , err
322
+ }
323
+ }
324
+ notificationSettingsBean = append (notificationSettingsBean , & repository.NotificationSettingsBean {
325
+ Id : item .Id ,
326
+ TeamId : item .TeamId ,
327
+ AppId : item .AppId ,
328
+ EnvId : item .EnvId ,
329
+ PipelineId : item .PipelineId ,
330
+ PipelineType : item .PipelineType ,
331
+ EventTypeId : item .EventTypeId ,
332
+ Config : config ,
333
+ ViewId : item .ViewId ,
334
+ })
335
+ }
336
+ return notificationSettingsBean , nil
337
+ }
338
+
339
+ func (impl * EventRESTClientImpl ) deliverEvent (bodyBytes []byte , destinationUrl string ) (bool , error ) {
243
340
if impl .config .NotificationMedium == PUB_SUB {
244
- err = impl .sendEventsOnNats (body )
245
- if err != nil {
246
- impl .logger .Errorw ("error while publishing event " , "err" , err )
341
+ if err := impl .sendEventsOnNats (bodyBytes ); err != nil {
342
+ impl .logger .Errorw ("error while publishing event" , "err" , err )
247
343
return false , err
248
344
}
249
345
return true , nil
250
346
}
251
- var reqBody = [] byte ( body )
252
- req , err := http .NewRequest (http .MethodPost , impl . config . DestinationURL , bytes .NewBuffer (reqBody ))
347
+
348
+ req , err := http .NewRequest (http .MethodPost , destinationUrl , bytes .NewBuffer (bodyBytes ))
253
349
if err != nil {
254
- impl .logger .Errorw ("error while writing event " , "err" , err )
350
+ impl .logger .Errorw ("error while creating HTTP request " , "err" , err )
255
351
return false , err
256
352
}
257
353
req .Header .Set ("Content-Type" , "application/json" )
354
+
258
355
resp , err := impl .client .Do (req )
259
356
if err != nil {
260
- impl .logger .Errorw ("error while UpdateJiraTransition request " , "err" , err )
357
+ impl .logger .Errorw ("error while sending HTTP request " , "err" , err )
261
358
return false , err
262
359
}
263
360
defer resp .Body .Close ()
264
- impl .logger .Debugw ("event completed" , "event resp" , resp )
265
- return true , err
361
+
362
+ if resp .StatusCode >= 300 {
363
+ impl .logger .Errorw ("unexpected response from notifier" , "status" , resp .StatusCode )
364
+ return false , fmt .Errorf ("unexpected response code: %d" , resp .StatusCode )
365
+ }
366
+
367
+ impl .logger .Debugw ("event successfully delivered" , "status" , resp .StatusCode )
368
+ return true , nil
266
369
}
267
370
268
371
func (impl * EventRESTClientImpl ) WriteNatsEvent (topic string , payload interface {}) error {
0 commit comments