Skip to content

Commit 4a30be8

Browse files
committed
fix(config): onStoppedNotification and onUnhandledError will now always async dispatch
Resolves #7343
1 parent 0bd47ea commit 4a30be8

File tree

3 files changed

+88
-94
lines changed

3 files changed

+88
-94
lines changed

packages/rxjs/spec/operators/share-spec.ts

Lines changed: 86 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -18,25 +18,12 @@ import {
1818
} from 'rxjs/operators';
1919
import { TestScheduler } from 'rxjs/testing';
2020
import { observableMatcher } from '../helpers/observableMatcher';
21-
import { SinonSpy, spy } from 'sinon';
21+
import { spy } from 'sinon';
2222

2323
const syncNotify = of(1);
2424
const asapNotify = scheduled(syncNotify, asapScheduler);
2525
const syncError = throwError(() => new Error());
2626

27-
function spyOnUnhandledError(fn: (spy: SinonSpy) => void): void {
28-
const prevOnUnhandledError = config.onUnhandledError;
29-
30-
try {
31-
const onUnhandledError = spy();
32-
config.onUnhandledError = onUnhandledError;
33-
34-
fn(onUnhandledError);
35-
} finally {
36-
config.onUnhandledError = prevOnUnhandledError;
37-
}
38-
}
39-
4027
/** @test {share} */
4128
describe('share', () => {
4229
let rxTest: TestScheduler;
@@ -810,9 +797,22 @@ describe('share', () => {
810797
});
811798
});
812799

813-
it('should not reset on refCount 0 if reset notifier errors before emitting any value', () => {
814-
spyOnUnhandledError((onUnhandledError) => {
800+
describe('when config.onUnhandledError is set', () => {
801+
afterEach(() => {
802+
config.onUnhandledError = null;
803+
});
804+
805+
it('should not reset on refCount 0 if reset notifier errors before emitting any value', (done) => {
815806
const error = new Error();
807+
let calls = 0;
808+
809+
config.onUnhandledError = spy((err) => {
810+
calls++;
811+
expect(err).to.equal(error);
812+
if (calls === 2) {
813+
done();
814+
}
815+
});
816816

817817
rxTest.run(({ hot, cold, expectObservable, expectSubscriptions }) => {
818818
const source = hot(' ---1---2---3---4---(5 )---|');
@@ -830,17 +830,16 @@ describe('share', () => {
830830
expectObservable(result, subscription).toBe(expected);
831831
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
832832
});
833-
834-
expect(onUnhandledError).to.have.been.calledTwice;
835-
expect(onUnhandledError.getCall(0)).to.have.been.calledWithExactly(error);
836-
expect(onUnhandledError.getCall(1)).to.have.been.calledWithExactly(error);
837833
});
838-
});
839834

840-
it('should not reset on error if reset notifier errors before emitting any value', () => {
841-
spyOnUnhandledError((onUnhandledError) => {
835+
it('should not reset on error if reset notifier errors before emitting any value', (done) => {
842836
const error = new Error();
843837

838+
config.onUnhandledError = spy((err) => {
839+
expect(err).to.equal(error);
840+
done();
841+
});
842+
844843
rxTest.run(({ cold, expectObservable, expectSubscriptions }) => {
845844
const source = cold(' ---1---2---# ');
846845
const sourceSubs = ' ^----------! ';
@@ -856,89 +855,86 @@ describe('share', () => {
856855
expectObservable(result, subscription).toBe(expected);
857856
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
858857
});
859-
860-
expect(onUnhandledError).to.have.been.calledOnce;
861-
expect(onUnhandledError.getCall(0)).to.have.been.calledWithExactly(error);
862858
});
863859
});
864860

865-
it('should not reset on complete if reset notifier errors before emitting any value', () => {
866-
spyOnUnhandledError((onUnhandledError) => {
867-
const error = new Error();
868-
869-
rxTest.run(({ cold, expectObservable, expectSubscriptions }) => {
870-
const source = cold(' ---1---2---| ');
871-
const sourceSubs = ' ^----------! ';
872-
const expected = ' ---1---2------|';
873-
const subscription = ' ^--------------';
874-
const firstPause = cold(' -------|');
875-
const reset = cold(' --# ', undefined, error);
876-
877-
const sharedSource = source.pipe(share({ resetOnComplete: () => reset, resetOnRefCountZero: false }), take(2));
861+
it('should not reset on complete if reset notifier errors before emitting any value', (done) => {
862+
const error = new Error();
878863

879-
const result = concat(sharedSource, firstPause, sharedSource);
880-
881-
expectObservable(result, subscription).toBe(expected);
882-
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
883-
});
884-
885-
expect(onUnhandledError).to.have.been.calledOnce;
886-
expect(onUnhandledError.getCall(0)).to.have.been.calledWithExactly(error);
864+
config.onUnhandledError = spy((err) => {
865+
expect(err).to.equal(error);
866+
done();
887867
});
888-
});
889868

890-
it('should not call "resetOnRefCountZero" on error', () => {
891869
rxTest.run(({ cold, expectObservable, expectSubscriptions }) => {
892-
const resetOnRefCountZero = spy(() => EMPTY);
893-
894-
const source = cold(' ---1---(2#) ');
895-
// source: ' ---1---(2#) '
896-
const sourceSubs = [
897-
' ^------(! ) ',
898-
// break the line, please
899-
' -------(- )---^------(! ) ',
900-
];
901-
const expected = ' ---1---(2 )------1---(2#) ';
902-
const subscription = ' ^------(- )----------(- ) ';
903-
const firstPause = cold(' (- )---| ');
904-
const reset = cold(' (- )-r ');
905-
// reset: ' (- )-r'
870+
const source = cold(' ---1---2---| ');
871+
const sourceSubs = ' ^----------! ';
872+
const expected = ' ---1---2------|';
873+
const subscription = ' ^--------------';
874+
const firstPause = cold(' -------|');
875+
const reset = cold(' --# ', undefined, error);
906876

907-
const sharedSource = source.pipe(share({ resetOnError: () => reset, resetOnRefCountZero }));
877+
const sharedSource = source.pipe(share({ resetOnComplete: () => reset, resetOnRefCountZero: false }), take(2));
908878

909-
const result = concat(sharedSource.pipe(onErrorResumeNextWith(firstPause)), sharedSource);
879+
const result = concat(sharedSource, firstPause, sharedSource);
910880

911881
expectObservable(result, subscription).toBe(expected);
912882
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
913-
expect(resetOnRefCountZero).to.not.have.been.called;
914883
});
915884
});
885+
});
916886

917-
it('should not call "resetOnRefCountZero" on complete', () => {
918-
rxTest.run(({ cold, expectObservable, expectSubscriptions }) => {
919-
const resetOnRefCountZero = spy(() => EMPTY);
920-
921-
const source = cold(' ---1---(2|) ');
922-
// source: ' ---1---(2|) '
923-
const sourceSubs = [
924-
' ^------(! ) ',
925-
// break the line, please
926-
' -------(- )---^------(! ) ',
927-
];
928-
const expected = ' ---1---(2 )------1---(2|) ';
929-
const subscription = ' ^------(- )----------(- ) ';
930-
const firstPause = cold(' (- )---| ');
931-
const reset = cold(' (- )-r ');
932-
// reset: ' (- )-r'
933-
934-
const sharedSource = source.pipe(share({ resetOnComplete: () => reset, resetOnRefCountZero }));
935-
936-
const result = concat(sharedSource, firstPause, sharedSource);
887+
it('should not call "resetOnRefCountZero" on error', () => {
888+
rxTest.run(({ cold, expectObservable, expectSubscriptions }) => {
889+
const resetOnRefCountZero = spy(() => EMPTY);
890+
891+
const source = cold(' ---1---(2#) ');
892+
// source: ' ---1---(2#) '
893+
const sourceSubs = [
894+
' ^------(! ) ',
895+
// break the line, please
896+
' -------(- )---^------(! ) ',
897+
];
898+
const expected = ' ---1---(2 )------1---(2#) ';
899+
const subscription = ' ^------(- )----------(- ) ';
900+
const firstPause = cold(' (- )---| ');
901+
const reset = cold(' (- )-r ');
902+
// reset: ' (- )-r'
903+
904+
const sharedSource = source.pipe(share({ resetOnError: () => reset, resetOnRefCountZero }));
905+
906+
const result = concat(sharedSource.pipe(onErrorResumeNextWith(firstPause)), sharedSource);
907+
908+
expectObservable(result, subscription).toBe(expected);
909+
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
910+
expect(resetOnRefCountZero).to.not.have.been.called;
911+
});
912+
});
937913

938-
expectObservable(result, subscription).toBe(expected);
939-
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
940-
expect(resetOnRefCountZero).to.not.have.been.called;
941-
});
914+
it('should not call "resetOnRefCountZero" on complete', () => {
915+
rxTest.run(({ cold, expectObservable, expectSubscriptions }) => {
916+
const resetOnRefCountZero = spy(() => EMPTY);
917+
918+
const source = cold(' ---1---(2|) ');
919+
// source: ' ---1---(2|) '
920+
const sourceSubs = [
921+
' ^------(! ) ',
922+
// break the line, please
923+
' -------(- )---^------(! ) ',
924+
];
925+
const expected = ' ---1---(2 )------1---(2|) ';
926+
const subscription = ' ^------(- )----------(- ) ';
927+
const firstPause = cold(' (- )---| ');
928+
const reset = cold(' (- )-r ');
929+
// reset: ' (- )-r'
930+
931+
const sharedSource = source.pipe(share({ resetOnComplete: () => reset, resetOnRefCountZero }));
932+
933+
const result = concat(sharedSource, firstPause, sharedSource);
934+
935+
expectObservable(result, subscription).toBe(expected);
936+
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
937+
expect(resetOnRefCountZero).to.not.have.been.called;
942938
});
943939
});
944940
});

packages/rxjs/src/internal/Subscriber.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import { Subscription } from './Subscription';
44
import { config } from './config';
55
import { reportUnhandledError } from './util/reportUnhandledError';
66
import { nextNotification, errorNotification, COMPLETE_NOTIFICATION } from './NotificationFactories';
7-
import { timeoutProvider } from './scheduler/timeoutProvider';
87

98
export interface SubscriberOverrides<T> {
109
/**
@@ -267,7 +266,7 @@ function createSafeObserver<T>(observerOrNext?: Partial<Observer<T>> | ((value:
267266
*/
268267
function handleStoppedNotification(notification: ObservableNotification<any>, subscriber: Subscriber<any>) {
269268
const { onStoppedNotification } = config;
270-
onStoppedNotification && timeoutProvider.setTimeout(() => onStoppedNotification(notification, subscriber));
269+
onStoppedNotification && setTimeout(() => onStoppedNotification(notification, subscriber));
271270
}
272271

273272
function hasAddAndUnsubscribe(value: any): value is Subscription {

packages/rxjs/src/internal/util/reportUnhandledError.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import { config } from '../config';
2-
import { timeoutProvider } from '../scheduler/timeoutProvider';
32

43
/**
54
* Handles an error on another job either with the user-configured {@link onUnhandledError},
@@ -11,7 +10,7 @@ import { timeoutProvider } from '../scheduler/timeoutProvider';
1110
* @param err the error to report
1211
*/
1312
export function reportUnhandledError(err: any) {
14-
timeoutProvider.setTimeout(() => {
13+
setTimeout(() => {
1514
const { onUnhandledError } = config;
1615
if (onUnhandledError) {
1716
// Execute the user-configured error handler.

0 commit comments

Comments
 (0)