US 3463 - Implement heartbeat protocol for taiga-events

stable
David Barragán Merino 2015-11-06 14:28:41 +01:00 committed by Juanfran
parent 99a74cd8fd
commit b489767d84
3 changed files with 72 additions and 8 deletions

View File

@ -2,6 +2,8 @@ version = ___VERSION___
window.taigaConfig = { window.taigaConfig = {
"api": "http://localhost:8000/api/v1/", "api": "http://localhost:8000/api/v1/",
"eventsUrl": null, "eventsUrl": null,
"eventsMaxMissedHeartbeats": 5,
"eventsHeartbeatIntervalTime": 60000,
"debug": true, "debug": true,
"defaultLanguage": "en", "defaultLanguage": "en",
"themes": ["taiga", "material-design", "high-contrast"], "themes": ["taiga", "material-design", "high-contrast"],

View File

@ -37,6 +37,9 @@ class EventsService
@.error = false @.error = false
@.pendingMessages = [] @.pendingMessages = []
@.missedHeartbeats = 0
@.heartbeatInterval = null
if @win.WebSocket is undefined if @win.WebSocket is undefined
@log.info "WebSockets not supported on your browser" @log.info "WebSockets not supported on your browser"
@ -70,10 +73,53 @@ class EventsService
@.ws.removeEventListener("close", @.onClose) @.ws.removeEventListener("close", @.onClose)
@.ws.removeEventListener("error", @.onError) @.ws.removeEventListener("error", @.onError)
@.ws.removeEventListener("message", @.onMessage) @.ws.removeEventListener("message", @.onMessage)
@.stopHeartBeatMessages()
@.ws.close() @.ws.close()
delete @.ws delete @.ws
###########################################
# Heartbeat (Ping - Pong)
###########################################
# See RFC https://tools.ietf.org/html/rfc6455#section-5.5.2
# RFC https://tools.ietf.org/html/rfc6455#section-5.5.3
startHeartBeatMessages: ->
return if @.heartbeatInterval
maxMissedHeartbeats = @config.get("eventsMaxMissedHeartbeats", 5)
heartbeatIntervalTime = @config.get("eventsHeartbeatIntervalTime", 60000)
@.missedHeartbeats = 0
@.heartbeatInterval = setInterval(() =>
try
if @.missedHeartbeats >= maxMissedHeartbeats
throw new Error("Too many missed heartbeats PINGs.")
@.missedHeartbeats++
@.sendMessage({cmd: "ping"})
@log.debug("HeartBeat send PING")
catch e
@log.error("HeartBeat error: " + e.message)
@.stopHeartBeatMessages()
, heartbeatIntervalTime)
@log.debug("HeartBeat enabled")
stopHeartBeatMessages: ->
return if not @.heartbeatInterval
clearInterval(@.heartbeatInterval)
@.heartbeatInterval = null
@log.debug("HeartBeat disabled")
processHeartBeatPongMessage: (data) ->
@.missedHeartbeats = 0
@log.debug("HeartBeat recived PONG")
###########################################
# Messages
###########################################
serialize: (message) -> serialize: (message) ->
if _.isObject(message) if _.isObject(message)
return JSON.stringify(message) return JSON.stringify(message)
@ -91,6 +137,19 @@ class EventsService
for msg in messages for msg in messages
@.ws.send(msg) @.ws.send(msg)
processMesage: (data) =>
routingKey = data.routing_key
if not @.subscriptions[routingKey]?
return
subscription = @.subscriptions[routingKey]
subscription.scope.$apply ->
subscription.callback(data.data)
###########################################
# Subscribe and Unsubscribe
###########################################
subscribe: (scope, routingKey, callback) -> subscribe: (scope, routingKey, callback) ->
if @.error if @.error
return return
@ -124,8 +183,12 @@ class EventsService
@.sendMessage(message) @.sendMessage(message)
###########################################
# Event listeners
###########################################
onOpen: -> onOpen: ->
@.connected = true @.connected = true
@.startHeartBeatMessages()
@log.debug("WebSocket connection opened") @log.debug("WebSocket connection opened")
token = @auth.getToken() token = @auth.getToken()
@ -141,14 +204,10 @@ class EventsService
@.log.debug "WebSocket message received: #{event.data}" @.log.debug "WebSocket message received: #{event.data}"
data = JSON.parse(event.data) data = JSON.parse(event.data)
routingKey = data.routing_key if data.cmd = "pong"
@.processHeartBeatPongMessage(data)
if not @.subscriptions[routingKey]? else
return @.processMessage(data)
subscription = @.subscriptions[routingKey]
subscription.scope.$apply ->
subscription.callback(data.data)
onError: (error) -> onError: (error) ->
@log.error("WebSocket error: #{error}") @log.error("WebSocket error: #{error}")
@ -157,6 +216,7 @@ class EventsService
onClose: -> onClose: ->
@log.debug("WebSocket closed.") @log.debug("WebSocket closed.")
@.connected = false @.connected = false
@.stopHeartBeatMessages()
class EventsProvider class EventsProvider

View File

@ -1,6 +1,8 @@
{ {
"api": "http://localhost:8000/api/v1/", "api": "http://localhost:8000/api/v1/",
"eventsUrl": null, "eventsUrl": null,
"eventsMaxMissedHeartbeats": 5,
"eventsHeartbeatIntervalTime": 60000,
"debug": true, "debug": true,
"debugInfo": false, "debugInfo": false,
"defaultLanguage": "en", "defaultLanguage": "en",