Skip to content
This repository was archived by the owner on Apr 8, 2024. It is now read-only.

Commit ced6b5e

Browse files
authored
Merge pull request #156 from jwulf/0.23.0-alpha.5
Fixes #151
2 parents 3d751f3 + 7f3ef07 commit ced6b5e

File tree

5 files changed

+17
-16
lines changed

5 files changed

+17
-16
lines changed

package-lock.json

+1-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "zeebe-node",
3-
"version": "v0.23.0-alpha.4",
3+
"version": "v0.23.0-alpha.5",
44
"description": "A Node.js client library for the Zeebe Microservices Orchestration Engine.",
55
"keywords": [
66
"zeebe",

src/__tests__/StdOut.spec.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@ jest.setTimeout(15000)
66
describe('StdOut Substitution', () => {
77
it('uses an injected stdout', done => {
88
const mockStd = new MockStdOut()
9-
const z = new ZBClient({ stdout: mockStd })
9+
const z = new ZBClient({ stdout: mockStd, eagerConnection: false })
1010

1111
// tslint:disable-next-line: no-console
12-
z.createWorker(null, 'test', console.log)
12+
z.createWorker('test', console.log)
1313
setTimeout(() => {
1414
z.close()
1515
}, 2000)

src/lib/GrpcClient.ts

+9-8
Original file line numberDiff line numberDiff line change
@@ -440,14 +440,15 @@ export class GrpcClient extends EventEmitter {
440440
return metadata
441441
}
442442

443-
private watchGrpcChannel(): Promise<number> {
443+
private waitForGrpcChannelReconnect(): Promise<number> {
444444
this.emit(MiddlewareSignals.Log.Debug, 'Start watching Grpc channel...')
445445
return new Promise(resolve => {
446+
const tryToConnect = true
446447
const gRPC = this.client
447448
if (this.channelClosed) {
448449
return
449450
}
450-
const state = gRPC.getChannel().getConnectivityState(false)
451+
const state = gRPC.getChannel().getConnectivityState(tryToConnect)
451452
this.emit(
452453
MiddlewareSignals.Log.Error,
453454
`Grpc Channel State: ${connectivityState[state]}`
@@ -473,7 +474,7 @@ export class GrpcClient extends EventEmitter {
473474
}
474475
const newState = gRPC
475476
.getChannel()
476-
.getConnectivityState(false)
477+
.getConnectivityState(tryToConnect)
477478
this.emit(
478479
MiddlewareSignals.Log.Error,
479480
`Grpc Channel State: ${connectivityState[newState]}`
@@ -484,16 +485,15 @@ export class GrpcClient extends EventEmitter {
484485
)
485486
if (
486487
newState === GrpcState.READY ||
487-
newState === GrpcState.IDLE ||
488-
newState === GrpcState.TRANSIENT_FAILURE
488+
newState === GrpcState.IDLE
489489
) {
490490
return resolve(newState)
491491
} else {
492492
this.emit(
493493
MiddlewareSignals.Log.Error,
494494
`Grpc Retry count: ${this.gRPCRetryCount}`
495495
)
496-
return resolve(await this.watchGrpcChannel())
496+
return resolve(await this.waitForGrpcChannelReconnect())
497497
}
498498
}
499499
)
@@ -562,17 +562,18 @@ export class GrpcClient extends EventEmitter {
562562
private handleGrpcError = (stream: any) => async (err: any) => {
563563
this.emit(MiddlewareSignals.Event.Error)
564564
this.emit(MiddlewareSignals.Log.Error, `Grpc Error: ${err.message}`)
565-
const channelState = await this.watchGrpcChannel()
565+
const channelState = await this.waitForGrpcChannelReconnect()
566566
this.emit(
567567
MiddlewareSignals.Log.Debug,
568568
`Grpc Channel state: ${connectivityState[channelState]}`
569569
)
570570
stream.removeAllListeners()
571+
this.emit(MiddlewareSignals.Event.Ready)
571572
// if (
572573
// channelState === GrpcState.READY ||
573574
// channelState === GrpcState.IDLE
574575
// ) {
575-
// this.emit(MiddlewareSignals.Log.Info, 'Grpc channel connected')
576+
// this.emit(MiddlewareSignals.Log.Info, 'Grpc channel reconnected')
576577
// // tslint:disable-next-line: no-console
577578
// console.log('handleGrpc', channelState) // @DEBUG
578579

src/lib/ZBWorkerBase.ts

+4-4
Original file line numberDiff line numberDiff line change
@@ -203,8 +203,8 @@ export class ZBWorkerBase<
203203
await this.grpcClient.close(timeout)
204204
this.grpcClient.removeAllListeners()
205205
Object.keys(this.jobStreams).forEach(key => {
206-
this.jobStreams[key].removeAllListeners()
207-
this.jobStreams[key].cancel()
206+
this.jobStreams[key]?.removeAllListeners?.()
207+
this.jobStreams[key]?.cancel?.()
208208
delete this.jobStreams[key]
209209
this.logger.logDebug('Removed Job Stream Listeners')
210210
})
@@ -399,7 +399,7 @@ export class ZBWorkerBase<
399399
this.jobStreams[id] = jobStream.stream
400400
// This event happens when the server cancels the call after the deadline
401401
// And when it has completed a response with work
402-
jobStream.stream.on('end', () => {
402+
jobStream.stream.on?.('end', () => {
403403
this.logger.logDebug(
404404
`Stream ended after ${(Date.now() - start) / 1000} seconds`
405405
)
@@ -478,7 +478,7 @@ export class ZBWorkerBase<
478478
}
479479
}
480480

481-
stream.on('data', (res: ActivateJobsResponse) => {
481+
stream?.on?.('data', (res: ActivateJobsResponse) => {
482482
// If we are closing, don't start working on these jobs. They will have to be timed out by the server.
483483
if (this.closing) {
484484
return

0 commit comments

Comments
 (0)