Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 100 additions & 49 deletions Sources/EventSource/EventSource.swift
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ public struct EventSource: Sendable {
private let eventParser: @Sendable () -> EventParser

public var timeoutInterval: TimeInterval
public var maxReconnectAttempts: Int
public var reconnectInitialDelay: TimeInterval
public var reconnectBackoffFactor: Double

public init(mode: Mode = .default, timeoutInterval: TimeInterval = 300) {
self.init(mode: mode, eventParser: ServerEventParser(mode: mode), timeoutInterval: timeoutInterval)
Expand All @@ -51,18 +54,27 @@ public struct EventSource: Sendable {
public init(
mode: Mode = .default,
eventParser: @autoclosure @escaping @Sendable () -> EventParser,
timeoutInterval: TimeInterval = 300
timeoutInterval: TimeInterval = 300,
maxReconnectAttempts: Int = 5,
reconnectInitialDelay: TimeInterval = 1.0,
reconnectBackoffFactor: Double = 2.0
) {
self.mode = mode
self.eventParser = eventParser
self.timeoutInterval = timeoutInterval
self.maxReconnectAttempts = maxReconnectAttempts
self.reconnectInitialDelay = reconnectInitialDelay
self.reconnectBackoffFactor = reconnectBackoffFactor
}

public func dataTask(for urlRequest: URLRequest) -> DataTask {
DataTask(
urlRequest: urlRequest,
eventParser: eventParser(),
timeoutInterval: timeoutInterval
timeoutInterval: timeoutInterval,
maxReconnectAttempts: maxReconnectAttempts,
reconnectInitialDelay: reconnectInitialDelay,
reconnectBackoffFactor: reconnectBackoffFactor
)
}
}
Expand All @@ -74,8 +86,76 @@ public extension EventSource {
/// ``EventSource/EventSource/dataTask(for:)`` method on the EventSource instance. After creating a task,
/// it can be started by iterating event stream returned by ``DataTask/events()``.
final class DataTask: Sendable {
/// Initializes or reinitializes the SSE session
private func startSession(stream continuation: AsyncStream<EventType>.Continuation) {
let sessionDelegate = SessionDelegate()
let urlSession = URLSession(
configuration: urlSessionConfiguration,
delegate: sessionDelegate,
delegateQueue: nil
)
let urlSessionDataTask = urlSession.dataTask(with: urlRequest)
let sessionDelegateTask = Task { [weak self] in
for await event in sessionDelegate.eventStream {
guard let self else { return }
switch event {
case let .didCompleteWithError(error):
self.handleSessionError(error, stream: continuation, urlSession: urlSession)
case let .didReceiveResponse(response, completionHandler):
self.handleSessionResponse(
response,
stream: continuation,
urlSession: urlSession,
completionHandler: completionHandler
)
case let .didReceiveData(data):
self.parseMessages(from: data, stream: continuation, urlSession: urlSession)
}
}
}
#if compiler(>=6.0)
continuation.onTermination = { @Sendable [weak self] _ in
sessionDelegateTask.cancel()
Task { self?.close(stream: continuation, urlSession: urlSession) }
}
#else
continuation.onTermination = { @Sendable _ in
sessionDelegateTask.cancel()
Task { [weak self] in
await self?.close(stream: continuation, urlSession: urlSession)
}
}
#endif

urlSessionDataTask.resume()
}

/// Helper method for reconnection
private func attemptReconnect(stream continuation: AsyncStream<EventType>.Continuation) {
let delay = reconnectInitialDelay * pow(reconnectBackoffFactor, Double(reconnectAttempts - 1))
DispatchQueue.global().asyncAfter(deadline: .now() + delay) { [weak self] in
self?.startSession(stream: continuation)
self?.readyState = .connecting
self?.consumed = true
}
}
private let _readyState: Mutex<ReadyState> = Mutex(.none)

// Reconnection properties
private let maxReconnectAttempts: Int
private let reconnectInitialDelay: TimeInterval
private let reconnectBackoffFactor: Double

private let _reconnectAttempts: Mutex<Int> = Mutex(0)
private var reconnectAttempts: Int {
get {
_reconnectAttempts.withLock { $0 }
}
set {
_reconnectAttempts.withLock { $0 = newValue }
}
}

/// A value representing the state of the connection.
public var readyState: ReadyState {
get {
Expand Down Expand Up @@ -153,11 +233,17 @@ public extension EventSource {
internal init(
urlRequest: URLRequest,
eventParser: EventParser,
timeoutInterval: TimeInterval
timeoutInterval: TimeInterval,
maxReconnectAttempts: Int,
reconnectInitialDelay: TimeInterval,
reconnectBackoffFactor: Double
) {
self.urlRequest = urlRequest
self._eventParser = Mutex(eventParser)
self.timeoutInterval = timeoutInterval
self.maxReconnectAttempts = maxReconnectAttempts
self.reconnectInitialDelay = reconnectInitialDelay
self.reconnectBackoffFactor = reconnectBackoffFactor
}

/// Creates and returns event stream.
Expand All @@ -170,49 +256,7 @@ public extension EventSource {
}

return AsyncStream { continuation in
let sessionDelegate = SessionDelegate()
let urlSession = URLSession(
configuration: urlSessionConfiguration,
delegate: sessionDelegate,
delegateQueue: nil
)
let urlSessionDataTask = urlSession.dataTask(with: urlRequest)

let sessionDelegateTask = Task { [weak self] in
for await event in sessionDelegate.eventStream {
guard let self else { return }

switch event {
case let .didCompleteWithError(error):
handleSessionError(error, stream: continuation, urlSession: urlSession)
case let .didReceiveResponse(response, completionHandler):
handleSessionResponse(
response,
stream: continuation,
urlSession: urlSession,
completionHandler: completionHandler
)
case let .didReceiveData(data):
parseMessages(from: data, stream: continuation, urlSession: urlSession)
}
}
}

#if compiler(>=6.0)
continuation.onTermination = { @Sendable [weak self] _ in
sessionDelegateTask.cancel()
Task { self?.close(stream: continuation, urlSession: urlSession) }
}
#else
continuation.onTermination = { @Sendable _ in
sessionDelegateTask.cancel()
Task { [weak self] in
await self?.close(stream: continuation, urlSession: urlSession)
}
}
#endif

urlSessionDataTask.resume()
startSession(stream: continuation)
readyState = .connecting
consumed = true
}
Expand All @@ -232,9 +276,15 @@ public extension EventSource {
if let error {
sendErrorEvent(with: error, stream: continuation)
}

// Close connection
close(stream: continuation, urlSession: urlSession)

// Attempts to reconnect if the limit has not been exceeded
if reconnectAttempts < maxReconnectAttempts {
reconnectAttempts += 1
attemptReconnect(stream: continuation)
} else {
// Close connection if attempts exceeded
close(stream: continuation, urlSession: urlSession)
}
}

private func handleSessionResponse(
Expand Down Expand Up @@ -312,6 +362,7 @@ public extension EventSource {

private func setOpen(stream continuation: AsyncStream<EventType>.Continuation) {
readyState = .open
reconnectAttempts = 0 // reset attempts when opening
continuation.yield(.open)
}

Expand Down
Loading