18
18
package org .softwarefactory .keycloak .providers .events .mqtt ;
19
19
20
20
import java .util .HashSet ;
21
+ import java .util .Set ;
22
+ import java .util .logging .Level ;
23
+ import java .util .logging .Logger ;
24
+
25
+ import org .eclipse .paho .client .mqttv3 .IMqttClient ;
26
+ import org .eclipse .paho .client .mqttv3 .MqttClient ;
27
+ import org .eclipse .paho .client .mqttv3 .MqttConnectOptions ;
28
+ import org .eclipse .paho .client .mqttv3 .MqttException ;
29
+ import org .eclipse .paho .client .mqttv3 .MqttSecurityException ;
30
+ import org .eclipse .paho .client .mqttv3 .persist .MemoryPersistence ;
21
31
import org .keycloak .Config ;
22
32
import org .keycloak .events .EventListenerProvider ;
23
33
import org .keycloak .events .EventListenerProviderFactory ;
24
34
import org .keycloak .events .EventType ;
25
35
import org .keycloak .events .admin .OperationType ;
26
36
import org .keycloak .models .KeycloakSession ;
27
37
import org .keycloak .models .KeycloakSessionFactory ;
28
- import org .softwarefactory .keycloak .providers .events .models .Configuration ;
38
+ import org .softwarefactory .keycloak .providers .events .models .MQTTMessageOptions ;
29
39
30
40
/**
31
41
* @author <a href="mailto:mhuin@redhat.com">Matthieu Huin</a>
32
42
*/
33
43
public class MQTTEventListenerProviderFactory implements EventListenerProviderFactory {
44
+ private static final Logger logger = Logger .getLogger (MQTTEventListenerProviderFactory .class .getName ());
45
+ private static final String PUBLISHER_ID = "keycloak" ;
34
46
35
- private Configuration configuration ;
47
+ private IMqttClient client ;
48
+ private Set <EventType > excludedEvents ;
49
+ private Set <OperationType > excludedAdminOperations ;
50
+ private MQTTMessageOptions messageOptions ;
36
51
37
52
@ Override
38
53
public EventListenerProvider create (KeycloakSession session ) {
39
- return new MQTTEventListenerProvider (configuration );
54
+ return new MQTTEventListenerProvider (excludedEvents , excludedAdminOperations , messageOptions , client );
40
55
}
41
56
42
57
@ Override
43
58
public void init (Config .Scope config ) {
44
- configuration = new Configuration ();
45
- String [] excludes = config .getArray ("exclude-events" );
59
+ var excludes = config .getArray ("excludeEvents" );
46
60
if (excludes != null ) {
47
- configuration . excludedEvents = new HashSet <>();
61
+ excludedEvents = new HashSet <EventType >();
48
62
for (String e : excludes ) {
49
- configuration . excludedEvents .add (EventType .valueOf (e ));
63
+ excludedEvents .add (EventType .valueOf (e ));
50
64
}
51
65
}
52
66
53
67
String [] excludesOperations = config .getArray ("excludesOperations" );
54
68
if (excludesOperations != null ) {
55
- configuration . excludedAdminOperations = new HashSet <>();
69
+ excludedAdminOperations = new HashSet <OperationType >();
56
70
for (String e : excludesOperations ) {
57
- configuration . excludedAdminOperations .add (OperationType .valueOf (e ));
71
+ excludedAdminOperations .add (OperationType .valueOf (e ));
58
72
}
59
73
}
60
74
61
- configuration .serverUri = config .get ("serverUri" , "tcp://localhost:1883" );
62
- configuration .username = config .get ("username" , null );
63
- configuration .password = config .get ("password" , null );
64
- configuration .topic = config .get ("topic" , "keycloak/events" );
65
- configuration .usePersistence = config .getBoolean ("usePersistence" , false );
66
- configuration .retained = config .getBoolean ("retained" , true );
67
- configuration .cleanSession = config .getBoolean ("cleanSession" , true );
68
- configuration .qos = config .getInt ("qos" , 0 );
75
+ MqttConnectOptions options = new MqttConnectOptions ();
76
+ var serverUri = config .get ("serverUri" , "tcp://localhost:1883" );
77
+
78
+ MemoryPersistence persistence = null ;
79
+ if (config .getBoolean ("usePersistence" , false )) {
80
+ persistence = new MemoryPersistence ();
81
+ }
82
+
83
+ var username = config .get ("username" , null );
84
+ var password = config .get ("password" , null );
85
+ if (username != null && password != null ) {
86
+ options .setUserName (username );
87
+ options .setPassword (password .toCharArray ());
88
+ }
89
+ options .setAutomaticReconnect (true );
90
+ options .setCleanSession (config .getBoolean ("cleanSession" , true ));
91
+ options .setConnectionTimeout (10 );
92
+
93
+ messageOptions = new MQTTMessageOptions ();
94
+ messageOptions .topic = config .get ("topic" , "keycloak/events" );
95
+ messageOptions .retained = config .getBoolean ("retained" , true );
96
+ messageOptions .qos = config .getInt ("qos" , 0 );
97
+
98
+ try {
99
+ client = new MqttClient (serverUri , PUBLISHER_ID , persistence );
100
+ client .connect (options );
101
+ } catch (MqttSecurityException e ){
102
+ logger .log (Level .SEVERE , "Connection not secure!" , e );
103
+ } catch (MqttException e ){
104
+ logger .log (Level .SEVERE , "Connection could not be established!" , e );
105
+ }
69
106
}
70
107
71
108
@ Override
@@ -75,7 +112,11 @@ public void postInit(KeycloakSessionFactory factory) {
75
112
76
113
@ Override
77
114
public void close () {
78
- // not needed
115
+ try {
116
+ client .disconnect ();
117
+ } catch (MqttException e ) {
118
+ logger .log (Level .SEVERE , "Connection could not be closed!" , e );
119
+ }
79
120
}
80
121
81
122
@ Override
0 commit comments