Skip to content

Commit 8958253

Browse files
committed
Add support for Server-Sent Events
1 parent 0fb3adf commit 8958253

File tree

14 files changed

+737
-23
lines changed

14 files changed

+737
-23
lines changed

Changes

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,13 @@
11

2-
9.41 2025-05-13
2+
9.41 2025-07-03
3+
- Added EXPERIMENTAL support for Server-Sent Events.
4+
- Added EXPERIMENTAL module Mojo::SSE.
5+
- Added EXPERIMENTAL sse attribute to Test::Mojo.
6+
- Added EXPERIMENTAL get_sse_ok, post_sse_ok, sse_finish_ok, sse_finished_ok, sse_ok, sse_id_is, sse_id_isnt,
7+
sse_text_is, sse_text_isnt, sse_text_like, sse_text_unlike, sse_type_is and sse_type_isnt methods to Test::Mojo.
8+
- Added EXPERIMENTAL is_sse and write_sse methods to Mojo::Content.
9+
- Added EXPERIMENTAL write_sse method to Mojolicious::Controller.
10+
- Added EXPERIMENTAL sse event to Mojo::Content.
311

412
9.40 2025-05-12
513
- Added EXPERIMENTAL support for resumable file downloads.

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@
3030
* A powerful **web development toolkit**, that you can use for all kinds of applications, independently of the web
3131
framework.
3232
* Full stack HTTP and WebSocket client/server implementation with IPv6, TLS, SNI, IDNA, HTTP/SOCKS5 proxy, UNIX
33-
domain socket, Comet (long polling), Promises/A+, async/await, keep-alive, connection pooling, timeout, cookie,
34-
multipart, and gzip compression support.
33+
domain socket, Comet (long polling), Server-Sent Events (SSE), Promises/A+, async/await, keep-alive, connection
34+
pooling, timeout, cookie, multipart, and gzip compression support.
3535
* Built-in non-blocking I/O web server, supporting multiple event loops as well as optional pre-forking and hot
3636
deployment, perfect for building highly scalable web services.
3737
* JSON and HTML/XML parser with CSS selector support.

examples/responses.pl

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,4 +27,10 @@
2727
die 'Hello World!';
2828
};
2929

30+
get '/res6' => sub ($c) {
31+
$c->write_sse({text => 'hello'});
32+
$c->write_sse({text => 'world'});
33+
$c->finish;
34+
};
35+
3036
app->start;

lib/Mojo/Content.pm

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use Mojo::Base 'Mojo::EventEmitter';
44
use Carp qw(croak);
55
use Compress::Raw::Zlib qw(WANT_GZIP Z_STREAM_END);
66
use Mojo::Headers;
7+
use Mojo::SSE qw(build_event parse_event);
78
use Scalar::Util qw(looks_like_number);
89

910
has [qw(auto_decompress auto_relax relaxed skip_body)];
@@ -62,6 +63,8 @@ sub is_multipart {undef}
6263

6364
sub is_parsing_body { (shift->{state} // '') eq 'body' }
6465

66+
sub is_sse { (shift->headers->content_type // '') eq 'text/event-stream' }
67+
6568
sub leftovers { shift->{buffer} }
6669

6770
sub parse {
@@ -73,11 +76,14 @@ sub parse {
7376

7477
# Chunked content
7578
$self->{real_size} //= 0;
76-
if ($self->is_chunked && $self->{state} ne 'headers') {
79+
if ($self->is_chunked) {
7780
$self->_parse_chunked;
7881
$self->{state} = 'finished' if ($self->{chunk_state} // '') eq 'finished';
7982
}
8083

84+
# SSE
85+
elsif ($self->is_sse) { $self->_parse_sse }
86+
8187
# Not chunked, pass through to second buffer
8288
else {
8389
$self->{real_size} += length $self->{pre_buffer};
@@ -158,6 +164,16 @@ sub write_chunk {
158164
return $self;
159165
}
160166

167+
sub write_sse {
168+
my ($self, $event, $cb) = @_;
169+
170+
$self->headers->content_type('text/event-stream') unless $self->{sse};
171+
$self->{sse} = 1;
172+
173+
return $self->write unless defined $event;
174+
return $self->write(build_event($event), $cb);
175+
}
176+
161177
sub _build_chunk {
162178
my ($self, $chunk) = @_;
163179

@@ -253,6 +269,20 @@ sub _parse_headers {
253269
$self->{header_size} = $self->{raw_size} - length $leftovers;
254270
}
255271

272+
sub _parse_sse {
273+
my $self = shift;
274+
275+
# Connection established
276+
$self->emit('sse') unless $self->{sse};
277+
$self->{sse} = 1;
278+
279+
# Parse SSE
280+
while (my $event = parse_event(\$self->{pre_buffer})) { $self->emit(sse => $event) }
281+
282+
# Check buffer size
283+
@$self{qw(state limit)} = ('finished', 1) if length($self->{pre_buffer} // '') > $self->max_buffer_size;
284+
}
285+
256286
sub _parse_until_body {
257287
my ($self, $chunk) = @_;
258288

@@ -319,6 +349,18 @@ Emitted when a new chunk of content arrives.
319349
say "Streaming: $bytes";
320350
});
321351
352+
=head2 sse
353+
354+
$content->on(sse => sub ($content, $event) {...});
355+
356+
Emitted when a new Server-Sent Event (SSE) connection has been established and for each new event that arrives. Note
357+
that this event is B<EXPERIMENTAL> and may change without warning!
358+
359+
$content->on(sse => sub ($content, $event) {
360+
if ($event) { say "Type: $event->{type}, Data: $event->{data}" }
361+
else { say "SSE connection established" }
362+
});
363+
322364
=head1 ATTRIBUTES
323365
324366
L<Mojo::Content> implements the following attributes.
@@ -480,6 +522,13 @@ False, this is not a L<Mojo::Content::MultiPart> object.
480522
481523
Check if body parsing started yet.
482524
525+
=head2 is_sse
526+
527+
my $bool = $content->is_sse;
528+
529+
Check if C<Content-Type> header indicates Server-Sent Events (SSE). Note that this method is B<EXPERIMENTAL> and may
530+
change without warning!
531+
483532
=head2 leftovers
484533
485534
my $bytes = $content->leftovers;
@@ -541,6 +590,16 @@ dynamic content to be written later. You can write an empty chunk of data at any
541590
});
542591
});
543592
593+
=head2 write_sse
594+
595+
$content = $content->write_sse;
596+
$content = $content->write_sse($event);
597+
$content = $content->write_sse($event => sub {...});
598+
599+
Write Server-Sent Event (SSE) non-blocking, the optional drain callback will be executed once all data has been
600+
written. Calling this method without an event will finalize the response headers and allow for events to be written
601+
later. Note that this method is B<EXPERIMENTAL> and may change without warning!
602+
544603
=head1 SEE ALSO
545604
546605
L<Mojolicious>, L<Mojolicious::Guides>, L<https://mojolicious.org>.

lib/Mojo/SSE.pm

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package Mojo::SSE;
2+
use Mojo::Base -strict;
3+
4+
use Carp qw(croak);
5+
use Exporter qw(import);
6+
use Mojo::Util qw(decode encode);
7+
8+
our @EXPORT_OK = (qw(build_event parse_event));
9+
10+
sub build_event {
11+
my $event = shift;
12+
13+
my $data = $event->{text} // '';
14+
croak 'Event data cannot contain newlines or linefeeds' if index($data, "\x0d") >= 0 || index($data, "\x0a") >= 0;
15+
16+
my @parts = defined $event->{type} ? ("event: $event->{type}") : ();
17+
push @parts, "data: $data";
18+
push @parts, "id: $event->{id}" if defined $event->{id};
19+
return encode('UTF-8', join("\x0d\x0a", @parts, '', ''));
20+
}
21+
22+
sub parse_event {
23+
my $buffer = shift;
24+
my $event = {id => undef, type => 'message', text => ''};
25+
26+
while ($$buffer =~ s/^(.*?)(?:(?:\x0d\x0a|(?<!\x0d)\x0a|\x0d(?!\x0a)){2})//s) {
27+
28+
# Skip lines with encoding errors
29+
next unless defined(my $lines = decode 'UTF-8', $1);
30+
31+
# Skip comments
32+
next if $lines =~ /^\s*:/;
33+
34+
my $first = 0;
35+
for my $line (split /(?:\x0d\x0a|(?<!\x0d)\x0a|\x0d(?!\x0a))/, $lines) {
36+
if ($line =~ /^event(?::\s*(\S.*))?$/) { $event->{type} = $1 // 'message' }
37+
elsif ($line =~ /^data(?::\s*(.*))?$/) { $event->{text} .= ($first++ ? "\n" : '') . ($1 // '') }
38+
elsif ($line =~ /^id(?::\s*(.*))?$/) { $event->{id} = $1 }
39+
}
40+
41+
return $event;
42+
}
43+
44+
return undef;
45+
}
46+
47+
1;
48+
49+
=encoding utf8
50+
51+
=head1 NAME
52+
53+
Mojo::SSE - Server-Sent Events
54+
55+
=head1 SYNOPSIS
56+
57+
use Mojo::SSE qw(build_event parse_event);
58+
59+
=head1 DESCRIPTION
60+
61+
L<Mojo::SSE> implements the Server-Sent Events protocol. Note that this module is B<EXPERIMENTAL> and may change
62+
without warning!
63+
64+
=head1 FUNCTIONS
65+
66+
L<Mojo::SSE> implements the following functions, which can be imported individually.
67+
68+
=head2 build_event
69+
70+
my $bytes = build_event $event, $chars;
71+
72+
Build Server-Sent Event.
73+
74+
=head2 parse_event
75+
76+
my $event = parse_event \$bytes;
77+
78+
Parse Server-Sent Event. Returns C<undef> if no complete event was found.
79+
80+
=head1 SEE ALSO
81+
82+
L<Mojolicious>, L<Mojolicious::Guides>, L<https://mojolicious.org>.
83+
84+
=cut

lib/Mojo/UserAgent/Transactor.pm

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,9 +144,11 @@ sub redirect {
144144
$headers->remove($_) for grep {/^content-/i} @{$headers->names};
145145
}
146146

147-
$new->res->content->auto_decompress(0) unless $self->compressed;
147+
my $content = $new->res->content;
148+
$content->auto_decompress(0) unless $self->compressed;
148149
my $headers = $new->req->url($location)->headers;
149150
$headers->remove($_) for qw(Authorization Cookie Host Referer);
151+
if ($res->content->has_subscribers('sse')) { $content->on(sse => $_) for @{$res->content->subscribers('sse')} }
150152

151153
return $new->previous($old);
152154
}

lib/Mojolicious/Controller.pm

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,12 @@ sub write_chunk {
329329
return $self->rendered;
330330
}
331331

332+
sub write_sse {
333+
my ($self, $event, $cb) = @_;
334+
$self->res->content->write_sse($event, $cb ? sub { shift; $self->$cb(@_) } : ());
335+
return $self->rendered;
336+
}
337+
332338
1;
333339

334340
=encoding utf8
@@ -920,6 +926,21 @@ You can call L</"finish"> or write an empty chunk of data at any time to end the
920926
o!
921927
0
922928
929+
=head2 write_sse
930+
931+
$c = $c->write_sse;
932+
$c = $c->write_sse($event);
933+
$c = $c->write_sse($event => sub ($c) {...});
934+
935+
Write Server-Sent Event (SSE) non-blocking, the optional drain callback will be executed once all data has been
936+
written. Calling this method without an event will finalize the response headers and allow for events to be written
937+
later. Note that this method is B<EXPERIMENTAL> and might change without warning!
938+
939+
# Make sure previous event has been written before continuing (id and type are optional)
940+
$c->write_sse({id => 23, type => 'greeting', text => 'Hello World!'} => sub ($c) {
941+
$c->write_sse({id => 24, type => 'farewell', text => 'Goodbye World!'} => sub ($c) { $c->finish });
942+
});
943+
923944
=head1 HELPERS
924945
925946
In addition to the L</"ATTRIBUTES"> and L</"METHODS"> above you can also call helpers provided by L</"app"> on

lib/Mojolicious/Guides/Cookbook.pod

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -998,12 +998,12 @@ result in much better performance, but also increases memory usage by up to 300K
998998

999999
You can also use L<Mojo::Transaction::WebSocket/"with_protocols"> to negotiate a subprotocol.
10001000

1001-
=head2 EventSource web service
1001+
=head2 Server-Sent Events web service
10021002

1003-
EventSource is a special form of long polling where you can use L<Mojolicious::Controller/"write"> to directly send DOM
1004-
events from servers to clients. It is uni-directional, that means you will have to use Ajax requests for sending data
1005-
from clients to servers, the advantage however is low infrastructure requirements, since it reuses the HTTP protocol
1006-
for transport.
1003+
Server-Sent Events (SSE) is a special form of long polling where you can use L<Mojolicious::Controller/"write_sse"> to
1004+
directly send DOM events from servers to clients. It is uni-directional, that means you will have to use Ajax requests
1005+
for sending data from clients to servers, the advantage however is low infrastructure requirements, since it reuses the
1006+
HTTP protocol for transport.
10071007

10081008
use Mojolicious::Lite -signatures;
10091009

@@ -1017,12 +1017,11 @@ for transport.
10171017
$c->inactivity_timeout(300);
10181018

10191019
# Change content type and finalize response headers
1020-
$c->res->headers->content_type('text/event-stream');
1021-
$c->write;
1020+
$c->write_sse;
10221021

10231022
# Subscribe to "message" event and forward "log" events to browser
10241023
my $cb = $c->app->log->on(message => sub ($log, $level, @lines) {
1025-
$c->write("event:log\ndata: [$level] @lines\n\n");
1024+
$c->write_sse({type => 'log', text => "[$level] @lines"});
10261025
});
10271026

10281027
# Unsubscribe from "message" event again once we are done

lib/Mojolicious/Types.pm

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ has mapping => sub {
2525
pdf => ['application/pdf'],
2626
png => ['image/png'],
2727
rss => ['application/rss+xml'],
28+
sse => ['text/event-stream'],
2829
svg => ['image/svg+xml'],
2930
ttf => ['font/ttf'],
3031
txt => ['text/plain;charset=UTF-8'],
@@ -114,6 +115,7 @@ L<Mojolicious::Types> manages MIME types for L<Mojolicious>.
114115
pdf -> application/pdf
115116
png -> image/png
116117
rss -> application/rss+xml
118+
sse -> text/event-stream
117119
svg -> image/svg+xml
118120
ttf -> font/ttf
119121
txt -> text/plain;charset=UTF-8

0 commit comments

Comments
 (0)