taiga-events/rabbit.coffee

124 lines
3.4 KiB
CoffeeScript

amqp = require('amqplib')
Promise = require('bluebird')
amqpUrl = require('./config').url
config = {
"exchange": {
"name": "events",
"type": "topic",
"options": {
"durable": false,
"autoDelete": true
}
},
"queue": {
"name": ""
"options": {
"autoDelete": true,
"exclusive": true
}
},
"channel": {
noAck: true
}
}
# Return the connection, creates the connection if it does not exist.
getConnection = do ->
connection = null
return () ->
return new Promise (resolve, reject) ->
if (!connection)
amqp.connect(amqpUrl).then (conn) ->
connection = conn
resolve(connection)
else
resolve(connection)
# Return the user channel
channels = do ->
chs = {}
pendingChannels = {}
removeClient = (client_id) ->
get(client_id).then (channel) ->
channel.close()
delete chs[client_id]
get = (client_id) ->
if pendingChannels[client_id]
return pendingChannels[client_id]
pendingChannels[client_id] = new Promise (resolve, reject) ->
delete pendingChannels[client_id]
if !chs[client_id]
getConnection()
.then (connection) -> connection.createChannel()
.then (channel) ->
chs[client_id] = channel
return resolve(chs[client_id])
else
resolve(chs[client_id])
return pendingChannels[client_id]
return {
removeClient: removeClient
get: get
}
# Return a new queue
queues = do ->
getExchange = (channel) ->
return channel.assertExchange(config.exchange.name, config.exchange.type, config.exchange.options)
getQueue = (channel, exchange) ->
return channel.assertQueue(config.queue.name, config.queue.options).then (qok) -> qok.queue
return {
create: (channel, client_id, routing_key) ->
return getExchange(channel)
.then (exchange) -> getQueue(channel)
}
subscriptions = do ->
subs = {}
bindAndSubscribe = (channel, queue, routing_key, cb) ->
channel.bindQueue(queue, config.exchange.name, routing_key)
return channel.consume(queue, cb, {noAck: true})
registerSubscription = (client_id, routing_key, consumerTag) ->
subs[client_id] = subs[client_id] || {}
subs[client_id][routing_key] = consumerTag
subscribe = (client_id, routing_key, cb) ->
channels.get(client_id)
.then (channel) ->
queues.create(channel)
.then (queue) -> bindAndSubscribe(channel, queue, routing_key, cb)
.then (ok) -> registerSubscription(client_id, routing_key, ok.consumerTag)
unsubscribe = (client_id, routing_key) ->
channels.get(client_id).then (channel) ->
channel.cancel(subs[client_id][routing_key])
removeClient = (client_id) ->
delete subs[client_id]
return {
subscribe: subscribe
unsubscribe: unsubscribe
removeClient: removeClient
}
exports.destroy = (client_id) ->
subscriptions.removeClient(client_id)
channels.removeClient(client_id)
exports.subscribe = subscriptions.subscribe
exports.unsubscribe = subscriptions.unsubscribe