@@ -112,6 +112,7 @@ public class WebSocket : NSObject, NSStreamDelegate {
112
112
public var security : SSLSecurity ?
113
113
public var enabledSSLCipherSuites : [ SSLCipherSuite ] ?
114
114
public var origin : String ?
115
+ public var timeout = 5
115
116
public var isConnected : Bool {
116
117
return connected
117
118
}
@@ -319,12 +320,12 @@ public class WebSocket : NSObject, NSStreamDelegate {
319
320
self . mutex. unlock ( )
320
321
321
322
let bytes = UnsafePointer < UInt8 > ( data. bytes)
322
- var timeout = 5000000 //wait 5 seconds before giving up
323
+ var out = timeout * 1000000 //wait 5 seconds before giving up
323
324
writeQueue. addOperationWithBlock { [ weak self] in
324
325
while !outStream. hasSpaceAvailable {
325
326
usleep ( 100 ) //wait until the socket is ready
326
- timeout -= 100
327
- if timeout < 0 {
327
+ out -= 100
328
+ if out < 0 {
328
329
self ? . cleanupStream ( )
329
330
self ? . doDisconnect ( self ? . errorWithDetail ( " write wait timed out " , code: 2 ) )
330
331
return
@@ -405,25 +406,24 @@ public class WebSocket : NSObject, NSStreamDelegate {
405
406
}
406
407
///dequeue the incoming input so it is processed in order
407
408
private func dequeueInput( ) {
408
- guard !inputQueue. isEmpty else { return }
409
-
410
- let data = inputQueue [ 0 ]
411
- var work = data
412
- if let fragBuffer = fragBuffer {
413
- let combine = NSMutableData ( data: fragBuffer)
414
- combine. appendData ( data)
415
- work = combine
416
- self . fragBuffer = nil
417
- }
418
- let buffer = UnsafePointer < UInt8 > ( work. bytes)
419
- let length = work. length
420
- if !connected {
421
- processTCPHandshake ( buffer, bufferLen: length)
422
- } else {
423
- processRawMessage ( buffer, bufferLen: length)
409
+ while !inputQueue. isEmpty {
410
+ let data = inputQueue [ 0 ]
411
+ var work = data
412
+ if let fragBuffer = fragBuffer {
413
+ let combine = NSMutableData ( data: fragBuffer)
414
+ combine. appendData ( data)
415
+ work = combine
416
+ self . fragBuffer = nil
417
+ }
418
+ let buffer = UnsafePointer < UInt8 > ( work. bytes)
419
+ let length = work. length
420
+ if !connected {
421
+ processTCPHandshake ( buffer, bufferLen: length)
422
+ } else {
423
+ processRawMessagesInBuffer ( buffer, bufferLen: length)
424
+ }
425
+ inputQueue = inputQueue. filter { $0 != data}
424
426
}
425
- inputQueue = inputQueue. filter { $0 != data}
426
- dequeueInput ( )
427
427
}
428
428
429
429
//handle checking the inital connection status
@@ -469,7 +469,7 @@ public class WebSocket : NSObject, NSStreamDelegate {
469
469
totalSize += 1 //skip the last \n
470
470
let restSize = bufferLen - totalSize
471
471
if restSize > 0 {
472
- processRawMessage ( ( buffer+ totalSize) , bufferLen: restSize)
472
+ processRawMessagesInBuffer ( buffer + totalSize, bufferLen: restSize)
473
473
}
474
474
return 0 //success
475
475
}
@@ -522,12 +522,15 @@ public class WebSocket : NSObject, NSStreamDelegate {
522
522
}
523
523
}
524
524
525
- ///process the websocket data
526
- private func processRawMessage( buffer: UnsafePointer < UInt8 > , bufferLen: Int) {
525
+ /// Process one message at the start of `buffer`. Return another buffer (sharing storage) that contains the leftover contents of `buffer` that I didn't process.
526
+ @warn_unused_result
527
+ private func processOneRawMessage( inBuffer buffer: UnsafeBufferPointer < UInt8 > ) -> UnsafeBufferPointer< UInt8 > {
527
528
let response = readStack. last
529
+ let baseAddress = buffer. baseAddress
530
+ let bufferLen = buffer. count
528
531
if response != nil && bufferLen < 2 {
529
- fragBuffer = NSData ( bytes : buffer, length : bufferLen )
530
- return
532
+ fragBuffer = NSData ( buffer: buffer )
533
+ return emptyBuffer
531
534
}
532
535
if let response = response where response. bytesLeft > 0 {
533
536
var len = response. bytesLeft
@@ -537,45 +540,41 @@ public class WebSocket : NSObject, NSStreamDelegate {
537
540
extra = 0
538
541
}
539
542
response. bytesLeft -= len
540
- response. buffer? . appendData ( NSData ( bytes: buffer , length: len) )
543
+ response. buffer? . appendData ( NSData ( bytes: baseAddress , length: len) )
541
544
processResponse ( response)
542
- let offset = bufferLen - extra
543
- if extra > 0 {
544
- processExtra ( ( buffer+ offset) , bufferLen: extra)
545
- }
546
- return
545
+ return buffer. fromOffset ( bufferLen - extra)
547
546
} else {
548
- let isFin = ( FinMask & buffer [ 0 ] )
549
- let receivedOpcode = OpCode ( rawValue: ( OpCodeMask & buffer [ 0 ] ) )
550
- let isMasked = ( MaskMask & buffer [ 1 ] )
551
- let payloadLen = ( PayloadLenMask & buffer [ 1 ] )
547
+ let isFin = ( FinMask & baseAddress [ 0 ] )
548
+ let receivedOpcode = OpCode ( rawValue: ( OpCodeMask & baseAddress [ 0 ] ) )
549
+ let isMasked = ( MaskMask & baseAddress [ 1 ] )
550
+ let payloadLen = ( PayloadLenMask & baseAddress [ 1 ] )
552
551
var offset = 2
553
- if ( isMasked > 0 || ( RSVMask & buffer [ 0 ] ) > 0 ) && receivedOpcode != . Pong {
552
+ if ( isMasked > 0 || ( RSVMask & baseAddress [ 0 ] ) > 0 ) && receivedOpcode != . Pong {
554
553
let errCode = CloseCode . ProtocolError. rawValue
555
554
doDisconnect ( errorWithDetail ( " masked and rsv data is not currently supported " , code: errCode) )
556
555
writeError ( errCode)
557
- return
556
+ return emptyBuffer
558
557
}
559
558
let isControlFrame = ( receivedOpcode == . ConnectionClose || receivedOpcode == . Ping)
560
559
if !isControlFrame && ( receivedOpcode != . BinaryFrame && receivedOpcode != . ContinueFrame &&
561
560
receivedOpcode != . TextFrame && receivedOpcode != . Pong) {
562
561
let errCode = CloseCode . ProtocolError. rawValue
563
562
doDisconnect ( errorWithDetail ( " unknown opcode: \( receivedOpcode) " , code: errCode) )
564
563
writeError ( errCode)
565
- return
564
+ return emptyBuffer
566
565
}
567
566
if isControlFrame && isFin == 0 {
568
567
let errCode = CloseCode . ProtocolError. rawValue
569
568
doDisconnect ( errorWithDetail ( " control frames can't be fragmented " , code: errCode) )
570
569
writeError ( errCode)
571
- return
570
+ return emptyBuffer
572
571
}
573
572
if receivedOpcode == . ConnectionClose {
574
573
var code = CloseCode . Normal. rawValue
575
574
if payloadLen == 1 {
576
575
code = CloseCode . ProtocolError. rawValue
577
576
} else if payloadLen > 1 {
578
- code = WebSocket . readUint16 ( buffer , offset: offset)
577
+ code = WebSocket . readUint16 ( baseAddress , offset: offset)
579
578
if code < 1000 || ( code > 1003 && code < 1007 ) || ( code > 1011 && code < 3000 ) {
580
579
code = CloseCode . ProtocolError. rawValue
581
580
}
@@ -584,7 +583,7 @@ public class WebSocket : NSObject, NSStreamDelegate {
584
583
if payloadLen > 2 {
585
584
let len = Int ( payloadLen- 2 )
586
585
if len > 0 {
587
- let bytes = UnsafePointer < UInt8 > ( ( buffer + offset) )
586
+ let bytes = baseAddress + offset
588
587
let str : NSString ? = NSString ( data: NSData ( bytes: bytes, length: len) , encoding: NSUTF8StringEncoding)
589
588
if str == nil {
590
589
code = CloseCode . ProtocolError. rawValue
@@ -593,23 +592,23 @@ public class WebSocket : NSObject, NSStreamDelegate {
593
592
}
594
593
doDisconnect ( errorWithDetail ( " connection closed by server " , code: code) )
595
594
writeError ( code)
596
- return
595
+ return emptyBuffer
597
596
}
598
597
if isControlFrame && payloadLen > 125 {
599
598
writeError ( CloseCode . ProtocolError. rawValue)
600
- return
599
+ return emptyBuffer
601
600
}
602
601
var dataLength = UInt64 ( payloadLen)
603
602
if dataLength == 127 {
604
- dataLength = WebSocket . readUint64 ( buffer , offset: offset)
603
+ dataLength = WebSocket . readUint64 ( baseAddress , offset: offset)
605
604
offset += sizeof ( UInt64)
606
605
} else if dataLength == 126 {
607
- dataLength = UInt64 ( WebSocket . readUint16 ( buffer , offset: offset) )
606
+ dataLength = UInt64 ( WebSocket . readUint16 ( baseAddress , offset: offset) )
608
607
offset += sizeof ( UInt16)
609
608
}
610
609
if bufferLen < offset || UInt64 ( bufferLen - offset) < dataLength {
611
- fragBuffer = NSData ( bytes: buffer , length: bufferLen)
612
- return
610
+ fragBuffer = NSData ( bytes: baseAddress , length: bufferLen)
611
+ return emptyBuffer
613
612
}
614
613
var len = dataLength
615
614
if dataLength > UInt64 ( bufferLen) {
@@ -620,7 +619,7 @@ public class WebSocket : NSObject, NSStreamDelegate {
620
619
len = 0
621
620
data = NSData ( )
622
621
} else {
623
- data = NSData ( bytes: UnsafePointer < UInt8 > ( ( buffer + offset) ) , length: Int ( len) )
622
+ data = NSData ( bytes: baseAddress + offset, length: Int ( len) )
624
623
}
625
624
if receivedOpcode == . Pong {
626
625
if canDispatch {
@@ -630,12 +629,7 @@ public class WebSocket : NSObject, NSStreamDelegate {
630
629
s. pongDelegate? . websocketDidReceivePong ( s)
631
630
}
632
631
}
633
- let step = Int ( offset+ numericCast( len) )
634
- let extra = bufferLen- step
635
- if extra > 0 {
636
- processRawMessage ( ( buffer+ step) , bufferLen: extra)
637
- }
638
- return
632
+ return buffer. fromOffset ( offset + Int( len) )
639
633
}
640
634
var response = readStack. last
641
635
if isControlFrame {
@@ -645,7 +639,7 @@ public class WebSocket : NSObject, NSStreamDelegate {
645
639
let errCode = CloseCode . ProtocolError. rawValue
646
640
doDisconnect ( errorWithDetail ( " continue frame before a binary or text frame " , code: errCode) )
647
641
writeError ( errCode)
648
- return
642
+ return emptyBuffer
649
643
}
650
644
var isNew = false
651
645
if response == nil {
@@ -654,7 +648,7 @@ public class WebSocket : NSObject, NSStreamDelegate {
654
648
doDisconnect ( errorWithDetail ( " first frame can't be a continue frame " ,
655
649
code: errCode) )
656
650
writeError ( errCode)
657
- return
651
+ return emptyBuffer
658
652
}
659
653
isNew = true
660
654
response = WSResponse ( )
@@ -669,7 +663,7 @@ public class WebSocket : NSObject, NSStreamDelegate {
669
663
doDisconnect ( errorWithDetail ( " second and beyond of fragment message must be a continue frame " ,
670
664
code: errCode) )
671
665
writeError ( errCode)
672
- return
666
+ return emptyBuffer
673
667
}
674
668
response!. buffer!. appendData ( data)
675
669
}
@@ -684,20 +678,18 @@ public class WebSocket : NSObject, NSStreamDelegate {
684
678
}
685
679
686
680
let step = Int ( offset+ numericCast( len) )
687
- let extra = bufferLen- step
688
- if extra > 0 {
689
- processExtra ( ( buffer+ step) , bufferLen: extra)
690
- }
681
+ return buffer. fromOffset ( step)
691
682
}
692
-
693
683
}
694
684
695
- ///process the extra of a buffer
696
- private func processExtra( buffer: UnsafePointer < UInt8 > , bufferLen: Int) {
697
- if bufferLen < 2 {
698
- fragBuffer = NSData ( bytes: buffer, length: bufferLen)
699
- } else {
700
- processRawMessage ( buffer, bufferLen: bufferLen)
685
+ /// Process all messages in the buffer if possible.
686
+ private func processRawMessagesInBuffer( pointer: UnsafePointer < UInt8 > , bufferLen: Int) {
687
+ var buffer = UnsafeBufferPointer ( start: pointer, count: bufferLen)
688
+ repeat {
689
+ buffer = processOneRawMessage ( inBuffer: buffer)
690
+ } while buffer. count >= 2
691
+ if buffer. count > 0 {
692
+ fragBuffer = NSData ( buffer: buffer)
701
693
}
702
694
}
703
695
@@ -835,6 +827,25 @@ public class WebSocket : NSObject, NSStreamDelegate {
835
827
836
828
}
837
829
830
+ private extension NSData {
831
+
832
+ convenience init( buffer: UnsafeBufferPointer < UInt8 > ) {
833
+ self . init ( bytes: buffer. baseAddress, length: buffer. count)
834
+ }
835
+
836
+ }
837
+
838
+ private extension UnsafeBufferPointer {
839
+
840
+ func fromOffset( offset: Int ) -> UnsafeBufferPointer < Element > {
841
+ return UnsafeBufferPointer < Element > ( start: baseAddress. advancedBy ( offset) , count: count - offset)
842
+ }
843
+
844
+ }
845
+
846
+ private let emptyBuffer = UnsafeBufferPointer < UInt8 > ( start: nil , count: 0 )
847
+
848
+
838
849
public class SSLCert {
839
850
var certData : NSData ?
840
851
var key : SecKeyRef ?
0 commit comments