Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,20 @@ public enum EventOutcome {
/**
* Student updated event outcome.
*/
GRAD_STATUS_UPDATED
GRAD_STATUS_UPDATED,
VALIDATION_SUCCESS_NO_ERROR_WARNING,
VALIDATION_SUCCESS_WITH_ERROR,
PEN_MATCH_PROCESSED,
GRAD_STATUS_FETCHED,
GRAD_STATUS_RESULTS_PROCESSED,
PEN_MATCH_RESULTS_PROCESSED,
READ_FROM_TOPIC_SUCCESS,
INITIATE_SUCCESS,
SAGA_COMPLETED,
ENROLLED_PROGRAMS_WRITTEN,
ADDITIONAL_STUDENT_ATTRIBUTES_CALCULATED,
STUDENTS_ARCHIVED,
BATCH_API_NOTIFIED,
FAILED_TO_START_ARCHIVE_STUDENTS_SAGA,
ARCHIVE_STUDENTS_STARTED
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,19 @@
public enum EventType {
GRAD_STUDENT_GRADUATED,
GRAD_STUDENT_UNDO_COMPLETION,
GRAD_STUDENT_UPDATED
GRAD_STUDENT_UPDATED,
VALIDATE_SDC_STUDENT,
PROCESS_PEN_MATCH,
FETCH_GRAD_STATUS,
PROCESS_GRAD_STATUS_RESULT,
PROCESS_PEN_MATCH_RESULTS,
READ_FROM_TOPIC,
INITIATED,
MARK_SAGA_COMPLETE,
GET_PAGINATED_SCHOOLS,
WRITE_ENROLLED_PROGRAMS,
CALCULATE_ADDITIONAL_STUDENT_ATTRIBUTES,
ARCHIVE_STUDENTS,
NOTIFY_ARCHIVE_STUDENT_BATCH_COMPLETED,
ARCHIVE_STUDENTS_REQUEST
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package ca.bc.gov.educ.api.gradstudent.constant;

public enum SagaEnum {
ARCHIVE_STUDENTS_SAGA
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package ca.bc.gov.educ.api.gradstudent.constant;

/**
* The enum Saga status enum.
*/
public enum SagaStatusEnum {
/**
* Started saga status enum.
*/
STARTED,
/**
* In progress saga status enum.
*/
IN_PROGRESS,
/**
* Completed saga status enum.
*/
COMPLETED,
/**
* Force stopped saga status enum.
*/
FORCE_STOPPED
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package ca.bc.gov.educ.api.gradstudent.constant;

import lombok.Getter;

import java.util.Arrays;
import java.util.Objects;
import java.util.Optional;

@Getter
public enum StudentStatusCodes {
CURRENT("CUR"),
ARCHIVED("ARC"),
DECEASED("DEC"),
MERGED("MER"),
TERMINATED("TER");

private final String code;
StudentStatusCodes(String code) {
this.code = code;
}

public static Optional<StudentStatusCodes> findByValue(String value) {
return Arrays.stream(values()).filter(e -> Objects.equals(e.code, value)).findFirst();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,8 @@ public enum Topics {
* GradStatus events topic.
*/
GRAD_STATUS_EVENT_TOPIC,
GRAD_STUDENT_API_FETCH_GRAD_STATUS_TOPIC
GRAD_STUDENT_API_FETCH_GRAD_STATUS_TOPIC,
GRAD_STUDENT_API_TOPIC,
GRAD_STUDENT_ARCHIVE_STUDENTS_SAGA_TOPIC,
GRAD_BATCH_API_TOPIC,
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package ca.bc.gov.educ.api.gradstudent.messaging;

import io.nats.client.Connection;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.time.Duration;
import java.util.Optional;

@Component
@Slf4j
public class MessagePublisher {
private final Connection connection;

@Autowired
public MessagePublisher(final Connection con) {
this.connection = con;
}

public void dispatchMessage(final String subject, final byte[] message) {
this.connection.publish(subject, message);
}

public Optional<String> requestMessage(final String subject, final byte[] message) throws InterruptedException {
log.debug("requesting from NATS on topic :: {} with payload :: {}", subject, new String(message));
val response = this.connection.request(subject, message, Duration.ofSeconds(30)).getData();
if (response == null || response.length == 0) {
return Optional.empty();
}
val responseValue = new String(response);
log.debug("got response from NATS :: {}", responseValue);
return Optional.of(responseValue);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package ca.bc.gov.educ.api.gradstudent.messaging;

import ca.bc.gov.educ.api.gradstudent.model.dc.Event;
import ca.bc.gov.educ.api.gradstudent.orchestrator.base.EventHandler;
import ca.bc.gov.educ.api.gradstudent.service.events.EventHandlerDelegatorService;
import ca.bc.gov.educ.api.gradstudent.util.EducGradStudentApiConstants;
import ca.bc.gov.educ.api.gradstudent.util.JsonUtil;
import ca.bc.gov.educ.api.gradstudent.util.LogHelper;
import io.nats.client.Connection;
import io.nats.client.Message;
import io.nats.client.MessageHandler;
import jakarta.annotation.PostConstruct;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static ca.bc.gov.educ.api.gradstudent.constant.Topics.GRAD_STUDENT_API_TOPIC;
import static lombok.AccessLevel.PRIVATE;

@Component
@Slf4j
public class MessageSubscriber {

@Getter(PRIVATE)
private final Map<String, EventHandler> handlerMap = new HashMap<>();
@Getter(PRIVATE)
private final EventHandlerDelegatorService eventHandlerDelegatorService;
private final Connection connection;
private final EducGradStudentApiConstants constants;

@Autowired
public MessageSubscriber(final Connection con, final List<EventHandler> eventHandlers, final EducGradStudentApiConstants constants, final EventHandlerDelegatorService eventHandlerDelegatorService) {
this.eventHandlerDelegatorService = eventHandlerDelegatorService;
this.connection = con;
eventHandlers.forEach(handler -> {
this.handlerMap.put(handler.getTopicToSubscribe(), handler);
this.subscribeForSAGA(handler.getTopicToSubscribe(), handler);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see some redundant codes in this class.
handlerMap is to handle multiple subscribers. I think subscribe() method below could be enhanced to use handlerMap so that onMessage could handle messages from multiple topics instead of having the separate onMessageForSAGA.

});
this.constants = constants;
}

@PostConstruct
public void subscribe() {
final String queue = GRAD_STUDENT_API_TOPIC.toString().replace("_", "-");
final var dispatcher = this.connection.createDispatcher(this.onMessage());
dispatcher.subscribe(GRAD_STUDENT_API_TOPIC.toString(), queue);
}

public MessageHandler onMessage() {
return (Message message) -> {
if (message != null) {
log.info("Message received subject :: {}, replyTo :: {}, subscriptionID :: {}", message.getSubject(), message.getReplyTo(), message.getSID());
try {
final var eventString = new String(message.getData());
LogHelper.logMessagingEventDetails(eventString, constants.isSplunkLogHelperEnabled());
final var event = JsonUtil.getJsonObjectFromString(Event.class, eventString);
eventHandlerDelegatorService.handleEvent(event, message);
} catch (final Exception e) {
log.error("Exception ", e);
}
}
};
}

private static MessageHandler onMessageForSAGA(final EventHandler eventHandler) {
return (Message message) -> {
if (message != null) {
log.info("Message received subject :: {}, replyTo :: {}, subscriptionID :: {}", message.getSubject(), message.getReplyTo(), message.getSID());
try {
final var eventString = new String(message.getData());
final var event = JsonUtil.getJsonObjectFromString(Event.class, eventString);
eventHandler.handleEvent(event);
} catch (final Exception e) {
log.error("Exception ", e);
}
}
};
}

private void subscribeForSAGA(final String topic, final EventHandler eventHandler) {
this.handlerMap.computeIfAbsent(topic, k -> eventHandler);
final String queue = topic.replace("_", "-");
final var dispatcher = this.connection.createDispatcher(MessageSubscriber.onMessageForSAGA(eventHandler));
dispatcher.subscribe(topic, queue);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package ca.bc.gov.educ.api.gradstudent.model.dc;

import ca.bc.gov.educ.api.gradstudent.constant.EventOutcome;
import ca.bc.gov.educ.api.gradstudent.constant.EventType;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.AllArgsConstructor;
import lombok.Builder;
Expand Down Expand Up @@ -36,13 +38,6 @@ public class Event {
/**
* The Event payload.
*/
private String eventPayload; // json string
/**
* The school batch ID
*/
private String sdcSchoolBatchID;
/**
* The student ID
*/
private String sdcSchoolStudentID;
private String eventPayload;
private String batchId;
}

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package ca.bc.gov.educ.api.gradstudent.model.dc;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.Data;
import lombok.EqualsAndHashCode;

@EqualsAndHashCode(callSuper = true)
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public class NotificationEvent extends Event {
private String sagaStatus;
private String sagaName;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package ca.bc.gov.educ.api.gradstudent.model.dto;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.List;

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
@JsonIgnoreProperties(ignoreUnknown = true)
public class ArchiveStudentsSagaData {
List<String> schoolsOfRecords;
long batchId;
String updateUser;
String studentStatusCode;
}
Loading
Loading