Skip to content

Commit 6bf1f6f

Browse files
ragansalehecka
authored andcommitted
Encapsulate frame management into connectionAutomaton (#400)
* - frameEnvelope PR cleaned up * - Removes unnecessary includes - Fixes camelCase variable names - Removes frameSerializer() - Refactors previous uses of frameSerializer()
1 parent 4709317 commit 6bf1f6f

20 files changed

+250
-271
lines changed

src/ConnectionAutomaton.cpp

Lines changed: 149 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -372,23 +372,23 @@ void ConnectionAutomaton::processFrameImpl(
372372
return;
373373
}
374374

375-
auto frameType = frameSerializer().peekFrameType(*frame);
375+
auto frameType = frameSerializer_->peekFrameType(*frame);
376376
stats_->frameRead(frameType);
377377

378378
// TODO(tmont): If a frame is invalid, it will still be tracked. However, we
379379
// actually want that. We want to keep
380380
// each side in sync, even if a frame is invalid.
381381
resumeCache_->trackReceivedFrame(*frame, frameType);
382382

383-
auto streamIdPtr = frameSerializer().peekStreamId(*frame);
383+
auto streamIdPtr = frameSerializer_->peekStreamId(*frame);
384384
if (!streamIdPtr) {
385385
// Failed to deserialize the frame.
386386
closeWithError(Frame_ERROR::invalidFrame());
387387
return;
388388
}
389389
auto streamId = *streamIdPtr;
390390
if (streamId == 0) {
391-
onConnectionFrame(std::move(frame));
391+
handleConnectionFrame(frameType, std::move(frame));
392392
return;
393393
}
394394

@@ -402,14 +402,7 @@ void ConnectionAutomaton::processFrameImpl(
402402
return;
403403
}
404404

405-
auto it = streamState_->streams_.find(streamId);
406-
if (it == streamState_->streams_.end()) {
407-
handleUnknownStream(streamId, std::move(frame));
408-
return;
409-
}
410-
auto automaton = it->second;
411-
// Can deliver the frame.
412-
automaton->onNextFrame(std::move(frame));
405+
handleStreamFrame(streamId, frameType, std::move(frame));
413406
}
414407

415408
void ConnectionAutomaton::onTerminal(folly::exception_wrapper ex) {
@@ -430,10 +423,10 @@ void ConnectionAutomaton::onTerminalImpl(folly::exception_wrapper ex) {
430423
}
431424
}
432425

433-
void ConnectionAutomaton::onConnectionFrame(
426+
void ConnectionAutomaton::handleConnectionFrame(
427+
FrameType frameType,
434428
std::unique_ptr<folly::IOBuf> payload) {
435-
auto type = frameSerializer().peekFrameType(*payload);
436-
switch (type) {
429+
switch (frameType) {
437430
case FrameType::KEEPALIVE: {
438431
Frame_KEEPALIVE frame;
439432
if (!deserializeFrameOrError(
@@ -479,7 +472,7 @@ void ConnectionAutomaton::onConnectionFrame(
479472
frame.moveToSetupPayload(setupPayload);
480473

481474
// this should be already set to the correct version
482-
if (frameSerializer().protocolVersion() != setupPayload.protocolVersion) {
475+
if (frameSerializer_->protocolVersion() != setupPayload.protocolVersion) {
483476
closeWithError(Frame_ERROR::badSetupFrame("invalid protocol version"));
484477
return;
485478
}
@@ -574,20 +567,86 @@ void ConnectionAutomaton::onConnectionFrame(
574567
}
575568
}
576569

570+
void ConnectionAutomaton::handleStreamFrame(
571+
StreamId streamId,
572+
FrameType frameType,
573+
std::unique_ptr<folly::IOBuf> serializedFrame) {
574+
auto it = streamState_->streams_.find(streamId);
575+
if (it == streamState_->streams_.end()) {
576+
handleUnknownStream(streamId, frameType, std::move(serializedFrame));
577+
return;
578+
}
579+
auto &automaton = it->second;
580+
581+
switch (frameType) {
582+
case FrameType::REQUEST_N: {
583+
Frame_REQUEST_N frameRequestN;
584+
if (!deserializeFrameOrError(frameRequestN,
585+
std::move(serializedFrame))) {
586+
return;
587+
}
588+
automaton->handleRequestN(frameRequestN.requestN_);
589+
break;
590+
}
591+
case FrameType::CANCEL: {
592+
automaton->handleCancel();
593+
break;
594+
}
595+
case FrameType::PAYLOAD: {
596+
Frame_PAYLOAD framePayload;
597+
if (!deserializeFrameOrError(framePayload,
598+
std::move(serializedFrame))) {
599+
return;
600+
}
601+
automaton->handlePayload(std::move(framePayload.payload_),
602+
framePayload.header_.flagsComplete(),
603+
framePayload.header_.flagsNext());
604+
break;
605+
}
606+
case FrameType::ERROR: {
607+
Frame_ERROR frameError;
608+
if (!deserializeFrameOrError(frameError,
609+
std::move(serializedFrame))) {
610+
return;
611+
}
612+
automaton->handleError(std::runtime_error(frameError.payload_.moveDataToString()));
613+
break;
614+
}
615+
case FrameType::REQUEST_CHANNEL:
616+
case FrameType::REQUEST_RESPONSE:
617+
case FrameType::RESERVED:
618+
case FrameType::SETUP:
619+
case FrameType::LEASE:
620+
case FrameType::KEEPALIVE:
621+
case FrameType::REQUEST_FNF:
622+
case FrameType::REQUEST_STREAM:
623+
case FrameType::METADATA_PUSH:
624+
case FrameType::RESUME:
625+
case FrameType::RESUME_OK:
626+
case FrameType::EXT:
627+
closeWithError(Frame_ERROR::unexpectedFrame());
628+
break;
629+
default:
630+
// because of compatibility with future frame types we will just ignore
631+
// unknown frames
632+
break;
633+
}
634+
}
635+
577636
void ConnectionAutomaton::handleUnknownStream(
578637
StreamId streamId,
638+
FrameType frameType,
579639
std::unique_ptr<folly::IOBuf> serializedFrame) {
580640
DCHECK(streamId != 0);
581641
// TODO: comparing string versions is odd because from version
582642
// 10.0 the lexicographic comparison doesn't work
583643
// we should change the version to struct
584-
if (frameSerializer().protocolVersion() > ProtocolVersion{0, 0} &&
644+
if (frameSerializer_->protocolVersion() > ProtocolVersion{0, 0} &&
585645
!streamsFactory_.registerNewPeerStreamId(streamId)) {
586646
return;
587647
}
588648

589-
auto type = frameSerializer().peekFrameType(*serializedFrame);
590-
switch (type) {
649+
switch (frameType) {
591650
case FrameType::REQUEST_CHANNEL: {
592651
Frame_REQUEST_CHANNEL frame;
593652
if (!deserializeFrameOrError(frame, std::move(serializedFrame))) {
@@ -647,7 +706,7 @@ void ConnectionAutomaton::handleUnknownStream(
647706
case FrameType::EXT:
648707
default:
649708
DLOG(ERROR) << "unknown stream frame (streamId=" << streamId
650-
<< " frameType=" << type << ")";
709+
<< " frameType=" << frameType << ")";
651710
closeWithError(Frame_ERROR::unexpectedFrame());
652711
}
653712
}
@@ -664,7 +723,22 @@ void ConnectionAutomaton::sendKeepalive(
664723
Frame_KEEPALIVE pingFrame(
665724
flags, resumeCache_->impliedPosition(), std::move(data));
666725
outputFrameOrEnqueue(
667-
frameSerializer().serializeOut(std::move(pingFrame), remoteResumeable_));
726+
frameSerializer_->serializeOut(std::move(pingFrame), remoteResumeable_));
727+
}
728+
729+
void ConnectionAutomaton::tryClientResume(
730+
const ResumeIdentificationToken& token,
731+
std::shared_ptr<FrameTransport> frameTransport,
732+
std::unique_ptr<ClientResumeStatusCallback> resumeCallback) {
733+
frameTransport->outputFrameOrEnqueue(frameSerializer_->serializeOut(
734+
createResumeFrame(token)));
735+
736+
// if the client was still connected we will disconnected the old connection
737+
// with a clear error message
738+
disconnect(
739+
std::runtime_error("resuming client on a different connection"));
740+
setResumable(true);
741+
reconnect(std::move(frameTransport), std::move(resumeCallback));
668742
}
669743

670744
Frame_RESUME ConnectionAutomaton::createResumeFrame(
@@ -673,7 +747,7 @@ Frame_RESUME ConnectionAutomaton::createResumeFrame(
673747
token,
674748
resumeCache_->impliedPosition(),
675749
resumeCache_->lastResetPosition(),
676-
frameSerializer().protocolVersion());
750+
frameSerializer_->protocolVersion());
677751
}
678752

679753
bool ConnectionAutomaton::isPositionAvailable(ResumePosition position) {
@@ -694,7 +768,7 @@ bool ConnectionAutomaton::resumeFromPositionOrClose(
694768

695769
if (clientPositionExist &&
696770
resumeCache_->isPositionAvailable(serverPosition)) {
697-
frameTransport_->outputFrameOrEnqueue(frameSerializer().serializeOut(
771+
frameTransport_->outputFrameOrEnqueue(frameSerializer_->serializeOut(
698772
Frame_RESUME_OK(resumeCache_->impliedPosition())));
699773
resumeFromPosition(serverPosition);
700774
return true;
@@ -738,14 +812,27 @@ void ConnectionAutomaton::outputFrameOrEnqueue(
738812
}
739813
}
740814

815+
void ConnectionAutomaton::requestFireAndForget(Payload request) {
816+
Frame_REQUEST_FNF frame(
817+
streamsFactory().getNextStreamId(),
818+
FrameFlags::EMPTY,
819+
std::move(std::move(request)));
820+
outputFrameOrEnqueue(frameSerializer_->serializeOut(std::move(frame)));
821+
}
822+
823+
void ConnectionAutomaton::metadataPush(std::unique_ptr<folly::IOBuf> metadata) {
824+
outputFrameOrEnqueue(frameSerializer_->serializeOut(
825+
Frame_METADATA_PUSH(std::move(metadata))));
826+
}
827+
741828
void ConnectionAutomaton::outputFrame(std::unique_ptr<folly::IOBuf> frame) {
742829
DCHECK(!isDisconnectedOrClosed());
743830

744-
auto frameType = frameSerializer().peekFrameType(*frame);
831+
auto frameType = frameSerializer_->peekFrameType(*frame);
745832
stats_->frameWritten(frameType);
746833

747834
if (isResumable_) {
748-
auto streamIdPtr = frameSerializer().peekStreamId(*frame);
835+
auto streamIdPtr = frameSerializer_->peekStreamId(*frame);
749836
resumeCache_->trackSentFrame(*frame, frameType, streamIdPtr);
750837
}
751838
frameTransport_->outputFrameOrEnqueue(std::move(frame));
@@ -800,9 +887,34 @@ void ConnectionAutomaton::setFrameSerializer(
800887
frameSerializer_ = std::move(frameSerializer);
801888
}
802889

803-
FrameSerializer& ConnectionAutomaton::frameSerializer() const {
804-
CHECK(frameSerializer_);
805-
return *frameSerializer_;
890+
void ConnectionAutomaton::setUpFrame(std::shared_ptr<FrameTransport> frameTransport,
891+
ConnectionSetupPayload setupPayload) {
892+
auto protocolVersion = getSerializerProtocolVersion();
893+
894+
Frame_SETUP frame(
895+
setupPayload.resumable ? FrameFlags::RESUME_ENABLE : FrameFlags::EMPTY,
896+
protocolVersion.major,
897+
protocolVersion.minor,
898+
getKeepaliveTime(),
899+
Frame_SETUP::kMaxLifetime,
900+
setupPayload.token,
901+
std::move(setupPayload.metadataMimeType),
902+
std::move(setupPayload.dataMimeType),
903+
std::move(setupPayload.payload));
904+
905+
// TODO: when the server returns back that it doesn't support resumability, we
906+
// should retry without resumability
907+
908+
// making sure we send setup frame first
909+
frameTransport->outputFrameOrEnqueue(
910+
frameSerializer_->serializeOut(std::move(frame)));
911+
// then the rest of the cached frames will be sent
912+
connect(
913+
std::move(frameTransport), true, ProtocolVersion::Unknown);
914+
}
915+
916+
ProtocolVersion ConnectionAutomaton::getSerializerProtocolVersion() {
917+
return frameSerializer_->protocolVersion();
806918
}
807919

808920
void ConnectionAutomaton::writeNewStream(
@@ -813,26 +925,26 @@ void ConnectionAutomaton::writeNewStream(
813925
bool completed) {
814926
switch (streamType) {
815927
case StreamType::CHANNEL:
816-
outputFrameOrEnqueue(frameSerializer().serializeOut(Frame_REQUEST_CHANNEL(
928+
outputFrameOrEnqueue(frameSerializer_->serializeOut(Frame_REQUEST_CHANNEL(
817929
streamId,
818930
completed ? FrameFlags::COMPLETE : FrameFlags::EMPTY,
819931
initialRequestN,
820932
std::move(payload))));
821933
break;
822934

823935
case StreamType::STREAM:
824-
outputFrameOrEnqueue(frameSerializer().serializeOut(Frame_REQUEST_STREAM(
936+
outputFrameOrEnqueue(frameSerializer_->serializeOut(Frame_REQUEST_STREAM(
825937
streamId, FrameFlags::EMPTY, initialRequestN, std::move(payload))));
826938
break;
827939

828940
case StreamType::REQUEST_RESPONSE:
829941
outputFrameOrEnqueue(
830-
frameSerializer().serializeOut(Frame_REQUEST_RESPONSE(
942+
frameSerializer_->serializeOut(Frame_REQUEST_RESPONSE(
831943
streamId, FrameFlags::EMPTY, std::move(payload))));
832944
break;
833945

834946
case StreamType::FNF:
835-
outputFrameOrEnqueue(frameSerializer().serializeOut(
947+
outputFrameOrEnqueue(frameSerializer_->serializeOut(
836948
Frame_REQUEST_FNF(streamId, FrameFlags::EMPTY, std::move(payload))));
837949
break;
838950

@@ -843,7 +955,7 @@ void ConnectionAutomaton::writeNewStream(
843955

844956
void ConnectionAutomaton::writeRequestN(StreamId streamId, uint32_t n) {
845957
outputFrameOrEnqueue(
846-
frameSerializer().serializeOut(Frame_REQUEST_N(streamId, n)));
958+
frameSerializer_->serializeOut(Frame_REQUEST_N(streamId, n)));
847959
}
848960

849961
void ConnectionAutomaton::writePayload(
@@ -854,7 +966,7 @@ void ConnectionAutomaton::writePayload(
854966
streamId,
855967
FrameFlags::NEXT | (complete ? FrameFlags::COMPLETE : FrameFlags::EMPTY),
856968
std::move(payload));
857-
outputFrameOrEnqueue(frameSerializer().serializeOut(std::move(frame)));
969+
outputFrameOrEnqueue(frameSerializer_->serializeOut(std::move(frame)));
858970
}
859971

860972
void ConnectionAutomaton::writeCloseStream(
@@ -864,21 +976,21 @@ void ConnectionAutomaton::writeCloseStream(
864976
switch (signal) {
865977
case StreamCompletionSignal::COMPLETE:
866978
outputFrameOrEnqueue(
867-
frameSerializer().serializeOut(Frame_PAYLOAD::complete(streamId)));
979+
frameSerializer_->serializeOut(Frame_PAYLOAD::complete(streamId)));
868980
break;
869981

870982
case StreamCompletionSignal::CANCEL:
871983
outputFrameOrEnqueue(
872-
frameSerializer().serializeOut(Frame_CANCEL(streamId)));
984+
frameSerializer_->serializeOut(Frame_CANCEL(streamId)));
873985
break;
874986

875987
case StreamCompletionSignal::ERROR:
876-
outputFrameOrEnqueue(frameSerializer().serializeOut(
988+
outputFrameOrEnqueue(frameSerializer_->serializeOut(
877989
Frame_ERROR::error(streamId, std::move(payload))));
878990
break;
879991

880992
case StreamCompletionSignal::APPLICATION_ERROR:
881-
outputFrameOrEnqueue(frameSerializer().serializeOut(
993+
outputFrameOrEnqueue(frameSerializer_->serializeOut(
882994
Frame_ERROR::applicationError(streamId, std::move(payload))));
883995
break;
884996

0 commit comments

Comments
 (0)