-
Notifications
You must be signed in to change notification settings - Fork 9
Expand file tree
/
Copy pathBufferedActivityHandler.java
More file actions
155 lines (132 loc) · 5.83 KB
/
BufferedActivityHandler.java
File metadata and controls
155 lines (132 loc) · 5.83 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
package com.perimeterx.api.activities;
import com.perimeterx.http.PXClient;
import com.perimeterx.models.PXContext;
import com.perimeterx.models.activities.*;
import com.perimeterx.models.configuration.PXConfiguration;
import com.perimeterx.models.exceptions.PXException;
import com.perimeterx.utils.Constants;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import static com.perimeterx.utils.PXCommonUtils.logTime;
import static com.perimeterx.utils.PXLogger.LogReason.ERROR_TELEMETRY_EXCEPTION;
/**
* Buffered activities and sends them to PX servers when buffer is full
* <p>
* Created by nitzangoldfeder on 05/03/2017.
*/
public class BufferedActivityHandler implements ActivityHandler {
private final int maxBufferLength;
private volatile ConcurrentLinkedQueue<Activity> bufferedActivities = new ConcurrentLinkedQueue<>();
private final AtomicInteger counter = new AtomicInteger(0);
private PXConfiguration configuration;
private PXClient client;
private ReentrantLock lock = new ReentrantLock();
public BufferedActivityHandler(PXClient client, PXConfiguration configuration) {
this.configuration = configuration;
this.client = client;
maxBufferLength = configuration.getMaxBufferLen();
}
@Override
public void handleBlockActivity(PXContext context) throws PXException {
logTime("handleBlockActivity", () -> {
Activity activity = ActivityFactory.createActivity(Constants.ACTIVITY_BLOCKED, configuration.getAppId(), context);
handleSendActivities(activity);
});
}
@Override
public void handlePageRequestedActivity(PXContext context) throws PXException {
logTime("handlePageRequestedActivity", () -> {
Activity activity = ActivityFactory.createActivity(Constants.ACTIVITY_PAGE_REQUESTED, configuration.getAppId(), context);
handleSendActivities(activity);
});
}
@Override
public void handleEnforcerTelemetryActivity(PXConfiguration pxConfig, UpdateReason updateReason) throws PXException {
logTime("handleEnforcerTelemetryActivity", () -> {
try {
EnforcerTelemetryActivityDetails details = new EnforcerTelemetryActivityDetails(pxConfig, updateReason);
EnforcerTelemetry enforcerTelemetry = new EnforcerTelemetry("enforcer_telemetry", pxConfig.getAppId(), details);
this.client.sendEnforcerTelemetry(enforcerTelemetry);
} catch (IOException e) {
throw new PXException(ERROR_TELEMETRY_EXCEPTION.toString(), e);
}
});
}
@Override
public void handleAdditionalS2SActivity(PXContext context) throws PXException {
logTime("handleAdditionalS2SActivity", () -> {
final Activity activity = createAdditionalS2SActivity(context);
handleSendActivities(activity);
});
}
public Activity createAdditionalS2SActivity(PXContext context) {
return logTime("createAdditionalS2SActivity", () -> {
final Activity activity = ActivityFactory.createActivity(Constants.ACTIVITY_ADDITIONAL_S2S, configuration.getAppId(), context);
if (isRequireRawUsername(context)) {
((AdditionalS2SActivityDetails) activity.getDetails())
.setUsername(context.getLoginData().getLoginCredentials().getRawUsername());
}
return activity;
});
}
private boolean isRequireRawUsername(PXContext context) {
final boolean loginRequestSentToOrigin = context.getLoginData().getLoginSuccessful() != null;
return (!loginRequestSentToOrigin || context.getLoginData().getLoginSuccessful())
&& context.isBreachedAccount()
&& configuration.isAddRawUsernameOnAdditionalS2SActivity();
}
private void handleSendActivities(Activity activity) throws PXException {
logTime("handleSendActivities", () -> {
bufferedActivities.add(activity);
int count = counter.incrementAndGet();
if (count > maxBufferLength) {
handleOverflow();
}
});
}
private void handleOverflow() throws PXException {
logTime("handleOverflow", () -> {
ConcurrentLinkedQueue<Activity> activitiesToSend;
if (lock.tryLock()) {
try {
activitiesToSend = flush();
} finally {
lock.unlock();
}
sendAsync(activitiesToSend);
}
});
}
private void sendAsync(ConcurrentLinkedQueue<Activity> activitiesToSend) throws PXException {
logTime("sendAsync", () -> {
if (activitiesToSend == null) {
return;
}
List<Activity> activitiesLocal = activitiesAsList(activitiesToSend);
try {
client.sendBatchActivities(activitiesLocal);
} catch (Exception e) {
throw new PXException(e);
}
});
}
private List<Activity> activitiesAsList(ConcurrentLinkedQueue<Activity> activityQueue) {
final int maxElements = maxBufferLength + 10;
List<Activity> localActivityList = new ArrayList<>();
for (int i = 0; i < maxElements && !activityQueue.isEmpty(); i++) {
Activity activity = activityQueue.poll();
localActivityList.add(activity);
}
return localActivityList;
}
private ConcurrentLinkedQueue<Activity> flush() {
ConcurrentLinkedQueue<Activity> activitiesToSend = bufferedActivities;
bufferedActivities = new ConcurrentLinkedQueue<>();
counter.set(0);
return activitiesToSend;
}
}