Skip to content

Commit 11d0358

Browse files
committed
wip: cadente/chunked, stability improvements, covering 63/95
1 parent 233704b commit 11d0358

File tree

11 files changed

+216
-134
lines changed

11 files changed

+216
-134
lines changed

cadente/Sisk.Cadente.CoreEngine/CadenteHttpServerEngineResponse.cs

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,7 @@
88
// File name: CadenteHttpServerEngineResponse.cs
99
// Repository: https://github.com/sisk-http/core
1010

11-
using System;
12-
using System.Collections.Generic;
1311
using System.Globalization;
14-
using System.Linq;
15-
using System.Net;
16-
using System.Text;
17-
using System.Threading.Tasks;
1812
using Sisk.Core.Http;
1913
using Sisk.Core.Http.Engine;
2014

@@ -27,7 +21,8 @@ public sealed class CadenteHttpServerEngineResponse : HttpServerEngineContextRes
2721
private readonly HttpHostContext.HttpResponse _response;
2822
private readonly HttpHostContext _httpHostContext;
2923
private CadenteHttpServerEngineContext? _context;
30-
private WebHeaderCollection _headers = new ();
24+
private Lazy<Stream> _outputStream;
25+
private CadenteEngineHeaderList _headers;
3126

3227
internal void SetContext ( CadenteHttpServerEngineContext context ) {
3328
_context = context;
@@ -41,6 +36,8 @@ internal void SetContext ( CadenteHttpServerEngineContext context ) {
4136
public CadenteHttpServerEngineResponse ( HttpHostContext.HttpResponse response, HttpHostContext httpHostContext ) {
4237
_response = response;
4338
_httpHostContext = httpHostContext;
39+
_outputStream = new Lazy<Stream> ( () => _response.GetResponseStream ( SendChunked ) );
40+
_headers = new CadenteEngineHeaderList ( _response );
4441
}
4542

4643
/// <inheritdoc/>
@@ -50,14 +47,11 @@ public CadenteHttpServerEngineResponse ( HttpHostContext.HttpResponse response,
5047
/// <inheritdoc/>
5148
public override bool KeepAlive { get => _httpHostContext.KeepAlive; set => _httpHostContext.KeepAlive = value; }
5249
/// <inheritdoc/>
53-
public override bool SendChunked { get => _response.SendChunked; set => _response.SendChunked = value; }
50+
public override bool SendChunked { get; set; }
5451
/// <inheritdoc/>
5552
public override long ContentLength64 {
5653
get {
57-
if (_response.ResponseStream is { CanSeek: true }) {
58-
return _response.ResponseStream.Length;
59-
}
60-
else if (_response.Headers.FirstOrDefault ( h => h.Name.Equals ( HttpKnownHeaderNames.ContentLength, StringComparison.OrdinalIgnoreCase ) ) is { IsEmpty: false } contentLenHeader) {
54+
if (_response.Headers.FirstOrDefault ( h => h.Name.Equals ( HttpKnownHeaderNames.ContentLength, StringComparison.OrdinalIgnoreCase ) ) is { IsEmpty: false } contentLenHeader) {
6155
return long.Parse ( contentLenHeader.Value );
6256
}
6357
else {
@@ -94,10 +88,10 @@ public override string? ContentType {
9488
}
9589

9690
/// <inheritdoc/>
97-
public override Stream OutputStream => _response.GetResponseStream ();
91+
public override Stream OutputStream => _outputStream.Value;
9892

9993
/// <inheritdoc/>
100-
public override IHttpEngineHeaderList Headers => new CadenteEngineHeaderList ( _response );
94+
public override IHttpEngineHeaderList Headers => _headers;
10195

10296
/// <inheritdoc/>
10397
public override void Abort () {
@@ -111,14 +105,16 @@ public override void AppendHeader ( string name, string value ) {
111105

112106
/// <inheritdoc/>
113107
public override void Close () {
114-
_context?.CompleteProcessing ();
115-
_response.ResponseStream?.Close ();
108+
Dispose ();
116109
}
117110

118111
/// <inheritdoc/>
119112
public override void Dispose () {
113+
if (_outputStream.IsValueCreated) {
114+
_outputStream.Value.Dispose ();
115+
}
116+
120117
_context?.CompleteProcessing ();
121-
_response.ResponseStream?.Dispose ();
122118
}
123119
}
124120
}

cadente/Sisk.Cadente/HttpConnection.cs

Lines changed: 20 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99

1010
using System.Net;
1111
using Sisk.Cadente.HttpSerializer;
12-
using Sisk.Cadente.Streams;
1312
using Sisk.Core.Http;
1413

1514
namespace Sisk.Cadente;
@@ -44,57 +43,27 @@ public async Task<HttpConnectionState> HandleConnectionEvents () {
4443
while (_connectionStream.CanRead && !disposedValue) {
4544

4645
HttpRequestReader requestReader = new HttpRequestReader ( _connectionStream );
47-
Stream? responseStream = null;
48-
49-
try {
50-
if (requestReader.TryReadHttpRequest ( out HttpRequestBase? nextRequest ) == false) {
51-
return HttpConnectionState.ConnectionClosed;
52-
}
53-
54-
HttpHostContext managedSession = new HttpHostContext ( _host, nextRequest, _client, _connectionStream );
55-
await _host.InvokeContextCreated ( managedSession );
56-
57-
if (!managedSession.KeepAlive || !nextRequest.CanKeepAlive) {
58-
connectionCloseRequested = true;
59-
managedSession.Response.Headers.Set ( new HttpHeader ( HttpHeaderName.Connection, "close" ) );
60-
}
61-
62-
if (managedSession.Response.ResponseStream is Stream { } s) {
63-
responseStream = s;
64-
}
65-
else {
66-
managedSession.Response.Headers.Set ( new HttpHeader ( HttpHeaderName.ContentLength, "0" ) );
67-
}
68-
69-
Stream outputStream = _connectionStream;
70-
if (responseStream is not null) {
71-
72-
if (managedSession.Response.SendChunked || !responseStream.CanSeek) {
73-
74-
managedSession.Response.Headers.Set ( new HttpHeader ( HttpHeaderName.TransferEncoding, "chunked" ) );
75-
responseStream = new HttpChunkedStream ( responseStream );
76-
}
77-
else {
78-
managedSession.Response.Headers.Set ( new HttpHeader ( HttpHeaderName.ContentLength, responseStream.Length.ToString () ) );
79-
}
80-
}
81-
82-
if (managedSession.ResponseHeadersAlreadySent == false && !managedSession.WriteHttpResponseHeaders ()) {
83-
return HttpConnectionState.ResponseWriteException;
84-
}
85-
86-
if (responseStream is not null) {
87-
await responseStream.CopyToAsync ( outputStream );
88-
}
89-
90-
await _connectionStream.FlushAsync ();
91-
92-
if (connectionCloseRequested) {
93-
break;
94-
}
46+
47+
if (requestReader.TryReadHttpRequest ( out HttpRequestBase? nextRequest ) == false) {
48+
return HttpConnectionState.ConnectionClosed;
49+
}
50+
51+
HttpHostContext managedSession = new HttpHostContext ( _host, nextRequest, _client, _connectionStream );
52+
await _host.InvokeContextCreated ( managedSession );
53+
54+
if (!managedSession.KeepAlive || !nextRequest.CanKeepAlive) {
55+
connectionCloseRequested = true;
56+
managedSession.Response.Headers.Set ( new HttpHeader ( HttpHeaderName.Connection, "close" ) );
9557
}
96-
finally {
97-
responseStream?.Dispose ();
58+
59+
if (managedSession.ResponseHeadersAlreadySent == false && !managedSession.WriteHttpResponseHeaders ()) {
60+
return HttpConnectionState.ResponseWriteException;
61+
}
62+
63+
await _connectionStream.FlushAsync ();
64+
65+
if (connectionCloseRequested) {
66+
break;
9867
}
9968
}
10069

cadente/Sisk.Cadente/HttpHeaderExtensions.cs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,20 @@ public static void Set ( this List<HttpHeader> headers, in HttpHeader header ) {
3636
headers.Add ( header );
3737
}
3838
}
39+
40+
/// <summary>
41+
/// Removes all <see cref="HttpHeader"/> with the given name from the list. Thread-safe.
42+
/// </summary>
43+
/// <param name="headers">The list of <see cref="HttpHeader"/> to modify.</param>
44+
/// <param name="headerName">The name of the header to remove.</param>
45+
public static void Remove ( this List<HttpHeader> headers, string headerName ) {
46+
lock (((ICollection) headers).SyncRoot) {
47+
var span = CollectionsMarshal.AsSpan ( headers );
48+
for (int i = span.Length - 1; i >= 0; i--) {
49+
if (Ascii.EqualsIgnoreCase ( span [ i ].NameBytes.Span, headerName )) {
50+
headers.RemoveAt ( i );
51+
}
52+
}
53+
}
54+
}
3955
}

cadente/Sisk.Cadente/HttpHost.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ public void Start () {
8787
ObjectDisposedException.ThrowIf ( disposedValue, this );
8888

8989
_listener.Server.NoDelay = true;
90-
_listener.Server.LingerState = new LingerOption ( true, 0 );
90+
_listener.Server.LingerState = new LingerOption ( true, 3 );
9191
_listener.Server.ReceiveBufferSize = HttpConnection.REQUEST_BUFFER_SIZE;
9292
_listener.Server.SendBufferSize = HttpConnection.RESPONSE_BUFFER_SIZE;
9393

cadente/Sisk.Cadente/HttpHostContext.cs

Lines changed: 25 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
using System.Text;
1212
using Sisk.Cadente.HttpSerializer;
1313
using Sisk.Cadente.Streams;
14+
using Sisk.Core.Http;
1415

1516
namespace Sisk.Cadente;
1617

@@ -125,6 +126,7 @@ internal HttpRequest ( HttpRequestBase request, HttpRequestStream requestStream
125126
public sealed class HttpResponse {
126127
private Stream _baseOutputStream;
127128
private HttpHostContext _session;
129+
private bool headersSent = false;
128130

129131
/// <summary>
130132
/// Gets or sets the HTTP status code of the response.
@@ -141,17 +143,6 @@ public sealed class HttpResponse {
141143
/// </summary>
142144
public List<HttpHeader> Headers { get; set; }
143145

144-
/// <summary>
145-
/// Gets or sets an boolean indicating if this <see cref="HttpResponse"/> should be send in chunks or not.
146-
/// </summary>
147-
public bool SendChunked { get; set; }
148-
149-
// MUST SPECIFY ResponseStream OR ResponseBytes, NOT BOTH
150-
/// <summary>
151-
/// Gets or sets the stream for the response content.
152-
/// </summary>
153-
public Stream? ResponseStream { get; set; }
154-
155146
/// <summary>
156147
/// Asynchronously gets an event stream writer with UTF-8 encoding.
157148
/// </summary>
@@ -165,28 +156,45 @@ public sealed class HttpResponse {
165156
/// <returns>A task that represents the asynchronous operation, with a <see cref="HttpEventStreamWriter"/> as the result.</returns>
166157
/// <exception cref="InvalidOperationException">Thrown when unable to obtain an output stream for the response.</exception>
167158
public HttpEventStreamWriter GetEventStream ( Encoding encoding ) {
159+
if (headersSent) {
160+
throw new InvalidOperationException ( "Headers already sent for this HTTP response." );
161+
}
162+
168163
Headers.Set ( new HttpHeader ( "Content-Type", "text/event-stream" ) );
169164
Headers.Set ( new HttpHeader ( "Cache-Control", "no-cache" ) );
170165

171-
if (_session.WriteHttpResponseHeaders () == false) {
166+
if (!_session.WriteHttpResponseHeaders ()) {
172167
throw new InvalidOperationException ( "Unable to obtain the output stream for the response." );
173168
}
174169

170+
headersSent = true;
175171
return new HttpEventStreamWriter ( _baseOutputStream, encoding );
176172
}
177173

178174
/// <summary>
179-
/// Asynchronously gets the content stream for the response.
175+
/// Gets the content stream for the response.
180176
/// </summary>
181177
/// <returns>A task that represents the asynchronous operation, with the response content stream as the result.</returns>
182178
/// <exception cref="InvalidOperationException">Thrown when unable to obtain an output stream for the response.</exception>
183-
public Stream GetResponseStream () {
184-
if (_session.WriteHttpResponseHeaders () == false) {
179+
public Stream GetResponseStream ( bool chunked = false ) {
180+
if (headersSent) {
181+
throw new InvalidOperationException ( "Headers already sent for this HTTP response." );
182+
}
183+
184+
if (chunked) {
185+
Headers.Set ( new HttpHeader ( HttpHeaderName.TransferEncoding, "chunked" ) );
186+
Headers.Remove ( HttpHeaderName.ContentLength );
187+
}
188+
189+
if (!_session.WriteHttpResponseHeaders ()) {
185190
throw new InvalidOperationException ( "Unable to obtain an output stream for the response." );
186191
}
187192

188-
ResponseStream = null;
189-
return _baseOutputStream;
193+
headersSent = true;
194+
return chunked switch {
195+
true => new HttpChunkedWriteStream ( _baseOutputStream ),
196+
false => _baseOutputStream
197+
};
190198
}
191199

192200
internal HttpResponse ( HttpHostContext session, Stream httpSessionStream ) {

cadente/Sisk.Cadente/Streams/HttpChunkedStream.cs renamed to cadente/Sisk.Cadente/Streams/HttpChunkedReadStream.cs

Lines changed: 39 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -7,20 +7,23 @@
77
// File name: HttpChunkedStream.cs
88
// Repository: https://github.com/sisk-http/core
99

10+
using System.Buffers;
1011
using System.Text;
1112

1213
namespace Sisk.Cadente.Streams;
1314

14-
internal class HttpChunkedStream : Stream {
15+
internal class HttpChunkedReadStream : Stream {
1516
private Stream _stream;
17+
private byte [] _buffer;
1618
int written = 0;
1719
bool innerStreamReturnedZero = false;
1820

19-
const int CHUNKED_MAX_SIZE = 4096;
20-
static readonly byte [] CrLf = [ 0x0D, 0x0A ];
21+
private static readonly byte [] s_crlfBytes = "\r\n"u8.ToArray ();
22+
private static readonly byte [] s_finalChunkBytes = "0\r\n\r\n"u8.ToArray ();
2123

22-
public HttpChunkedStream ( Stream stream ) {
24+
public HttpChunkedReadStream ( Stream stream ) {
2325
_stream = stream;
26+
_buffer = ArrayPool<byte>.Shared.Rent ( 128 * 1024 );
2427
}
2528

2629
public override bool CanRead => _stream.CanRead;
@@ -42,34 +45,30 @@ public override int Read ( byte [] buffer, int offset, int count ) {
4245
if (innerStreamReturnedZero)
4346
return 0;
4447

45-
Span<byte> destination = buffer.AsSpan ( offset, count );
46-
if (destination.Length < 2)
47-
throw new ArgumentException ( "The provided buffer slice must be at least 2 bytes long.", nameof ( count ) );
48-
49-
Span<byte> readBuffer = stackalloc byte [ Math.Min ( destination.Length - 2, CHUNKED_MAX_SIZE ) ];
50-
int read = _stream.Read ( readBuffer );
51-
byte [] readBytesEncoded = Encoding.ASCII.GetBytes ( $"{read:x}\r\n" );
52-
53-
if (destination.Length < readBytesEncoded.Length + read + 2)
54-
throw new ArgumentException ( "The provided buffer slice is not large enough to hold the chunked response.", nameof ( count ) );
55-
56-
if (read == 0) {
48+
var position = 0;
49+
var bytesRead = _stream.Read ( _buffer, 0, Math.Min ( _buffer.Length, count ) );
50+
if (bytesRead == 0) {
5751
innerStreamReturnedZero = true;
52+
53+
// write last chunk
54+
WriteToBuffer ( s_finalChunkBytes );
55+
return s_finalChunkBytes.Length;
5856
}
5957

60-
ReadOnlySpan<byte> headerSpan = readBytesEncoded.AsSpan ();
61-
headerSpan.CopyTo ( destination );
58+
var bytesReadHex = Encoding.ASCII.GetBytes ( bytesRead.ToString ( "X" ) );
6259

63-
int copyStart = headerSpan.Length;
64-
readBuffer [ 0..read ].CopyTo ( destination [ copyStart.. ] );
60+
WriteToBuffer ( bytesReadHex );
61+
WriteToBuffer ( s_crlfBytes );
62+
WriteToBuffer ( _buffer.AsSpan ( 0, bytesRead ) );
63+
WriteToBuffer ( s_crlfBytes );
6564

66-
int bufferEnd = headerSpan.Length + read;
67-
destination [ bufferEnd + 0 ] = 0x0D;
68-
destination [ bufferEnd + 1 ] = 0x0A;
65+
return position;
6966

70-
written += read;
67+
void WriteToBuffer ( Span<byte> data ) {
7168

72-
return bufferEnd + 2;
69+
data.CopyTo ( buffer.AsSpan ( offset + position ) );
70+
position += data.Length;
71+
}
7372
}
7473

7574
public override long Seek ( long offset, SeekOrigin origin ) {
@@ -83,4 +82,19 @@ public override void SetLength ( long value ) {
8382
public override void Write ( byte [] buffer, int offset, int count ) {
8483
throw new NotSupportedException ();
8584
}
85+
86+
protected override void Dispose ( bool disposing ) {
87+
88+
if (_stream == null)
89+
return;
90+
91+
if (disposing) {
92+
93+
_stream.Dispose ();
94+
ArrayPool<byte>.Shared.Return ( _buffer );
95+
96+
_stream = null!;
97+
_buffer = null!;
98+
}
99+
}
86100
}

0 commit comments

Comments
 (0)