@@ -553,52 +553,116 @@ end
553
553
end
554
554
end
555
555
556
- @testset " HTTP CONNECT Proxy pool" begin
557
- # Stores the http request passed by the client
558
- messages = []
559
- streams = Set ()
560
-
561
- function forwardstream (src, dst)
562
- while isopen (dst) && isopen (src) && ! eof (src)
563
- buff = readavailable (src)
564
- ! isempty (buff) && isopen (dst) && write (dst, buff)
565
- end
556
+ function forwardstream (src, dst)
557
+ while isopen (dst) && isopen (src) && ! eof (src)
558
+ buff = readavailable (src)
559
+ ! isempty (buff) && isopen (dst) && write (dst, buff)
560
+ end
561
+ end
566
562
567
- close (src)
568
- close (dst)
563
+ @testset " HTTP CONNECT Proxy pool" begin
564
+ function forwardclosetask (src, dst)
565
+ errormonitor (@async begin
566
+ forwardstream (src, dst)
567
+ close (src)
568
+ close (dst)
569
+ end )
569
570
end
570
571
571
- # Simple implementation of a proxy server
572
+ # Stores the http message passed by the client
573
+ messages = []
574
+ upstreams = Set ()
575
+
576
+ # Simple implementation of a https proxy server
572
577
proxy = HTTP. listen! (IPv4 (0 ), 8082 ; stream = true ) do http:: HTTP.Stream
573
578
push! (messages, http. message)
574
- host, port = split (http. message. target, " :" )
575
- targetstream = connect (host, parse (Int, port))
576
- HTTP. setstatus (http, 200 )
577
- HTTP. startwrite (http)
578
- up = @async forwardstream (http. stream. io, targetstream)
579
- down = @async forwardstream (targetstream, http. stream. io)
580
- push! (streams, targetstream)
581
- wait (up)
582
- wait (down)
583
- delete! (streams, targetstream)
579
+
580
+ hostport = split (http. message. target, " :" )
581
+ targetstream = connect (hostport[1 ], parse (Int, get (hostport, 2 , " 443" )))
582
+ push! (upstreams, targetstream)
583
+ try
584
+ HTTP. setstatus (http, 200 )
585
+ HTTP. startwrite (http)
586
+ up = forwardclosetask (http. stream. io, targetstream)
587
+ down = forwardclosetask (targetstream, http. stream. io)
588
+
589
+ wait (up)
590
+ wait (down)
591
+ finally
592
+ delete! (upstreams, targetstream)
593
+ end
584
594
end
585
595
586
596
try
587
597
# Make the HTTP request
588
- r1 = HTTP. get (" https://example.com:443 " ; proxy= " http://localhost:8082" , retry= false , status_exception= true )
598
+ r1 = HTTP. get (" https://$httpbin /ip " ; proxy= " http://localhost:8082" , retry= false , status_exception= true )
589
599
@test length (messages) == 1
590
600
@test first (messages). method == " CONNECT"
591
- @test length (streams ) == 1 && isopen (first (streams )) # still alive
601
+ @test length (upstreams ) == 1 && isopen (first (upstreams )) # still alive
592
602
593
603
# Make another request
594
604
# This should reuse the connection pool and not make another request to the proxy
595
605
empty! (messages)
596
- r2 = HTTP. get (" https://example.com:443" ; proxy= " http://localhost:8082" , retry= false , status_exception= true )
597
- @test isempty (messages)
598
- @test r1. body == r2. body # no new message to the proxy yet successfully get the same response from the target server
599
- @test length (streams) == 1 && isopen (first (streams)) # still only one stream alive
606
+ r2 = HTTP. get (" https://$httpbin /ip" ; proxy= " http://localhost:8082" , retry= false , status_exception= true )
607
+ @test isempty (messages) # no new message to the proxy
608
+ @test length (upstreams) == 1 && isopen (first (upstreams)) # still only one stream alive
609
+ finally
610
+ close .(upstreams)
611
+ close (proxy)
612
+ HTTP. Connections. closeall ()
613
+ wait (proxy)
614
+ end
615
+ end
616
+
617
+ @testset " HTTP Proxy pool" begin
618
+ # Stores the http request passed by the client
619
+ downstreamconnections = Set {HTTP.Connections.Connection} ()
620
+ upstreamconnections = Set {HTTP.Connections.Connection} ()
621
+ finished_request = Base. Event (true )
622
+
623
+ # Simple implementation of a http proxy server
624
+ proxy = HTTP. listen! (IPv4 (0 ), 8082 ; stream = true ) do http:: HTTP.Stream
625
+ push! (downstreamconnections, http. stream)
626
+
627
+ HTTP. open (http. message. method, http. message. target, http. message. headers;
628
+ decompress = false , version = http. message. version, retry= false ,
629
+ redirect = false ) do targetstream
630
+ push! (upstreamconnections, targetstream. stream)
631
+
632
+ up = errormonitor (@async forwardstream (http, targetstream))
633
+ targetresponse = startread (targetstream)
634
+
635
+ HTTP. setstatus (http, targetresponse. status)
636
+ for h in targetresponse. headers
637
+ HTTP. setheader (http, h)
638
+ end
639
+
640
+ HTTP. startwrite (http)
641
+ down = errormonitor (@async forwardstream (targetstream, http))
642
+
643
+ wait (up)
644
+ wait (down)
645
+
646
+ notify (finished_request)
647
+ end
648
+ end
649
+
650
+ try
651
+ # Make the HTTP request
652
+ r1 = HTTP. get (" http://$httpbin /ip" ; proxy= " http://localhost:8082" , retry= false , redirect = false , status_exception= true )
653
+ wait (finished_request)
654
+ @test length (downstreamconnections) == 1
655
+ @test length (upstreamconnections) == 1
656
+
657
+ # Make another request
658
+ # This should reuse a connection pool in both the client and proxy
659
+ r2 = HTTP. get (" http://$httpbin /ip" ; proxy= " http://localhost:8082" , retry= false , redirect = false , status_exception= true )
660
+
661
+ # Check that notify was actually called, but that the set of connections remains of size 1
662
+ wait (finished_request)
663
+ @test length (downstreamconnections) == 1
664
+ @test length (upstreamconnections) == 1
600
665
finally
601
- close .(streams)
602
666
close (proxy)
603
667
HTTP. Connections. closeall ()
604
668
wait (proxy)
0 commit comments