From cc149d4025fb7d32830d9a3b426609baf0ac3c74 Mon Sep 17 00:00:00 2001 From: Juanfran Date: Thu, 19 Feb 2015 15:20:10 +0100 Subject: [PATCH] new taiga events - one queue per subription - one channel per websocket connection - one consumer per subscription --- client.coffee | 50 +++++++++++++++++++ clients.coffee | 36 -------------- config.json | 11 +---- index.coffee | 51 +------------------- package.json | 4 +- rabbit.coffee | 115 ++++++++++++++++++++++++++++++++++++++++++++ signing.coffee | 30 ++++++++++++ subscription.coffee | 46 ++++++++++++++++++ 8 files changed, 248 insertions(+), 95 deletions(-) create mode 100644 client.coffee delete mode 100644 clients.coffee create mode 100644 rabbit.coffee create mode 100644 signing.coffee create mode 100644 subscription.coffee diff --git a/client.coffee b/client.coffee new file mode 100644 index 0000000..6601a6b --- /dev/null +++ b/client.coffee @@ -0,0 +1,50 @@ +uuid = require('node-uuid') +signing = require('./signing') +SubscriptionManager = require('./subscription').SubscriptionManager + +clients = {} + +class Client + constructor: (@ws) -> + @id = uuid.v4() + + @handleEvents() + + handleEvents: () -> + @ws.on 'message', @handleMessage.bind(@) + + handleMessage: (message) -> + msg = JSON.parse(message) + + if msg.cmd == 'auth' + @auth(msg.data) + else if msg.cmd == 'subscribe' + @addSubscription(msg.routing_key) + else if msg.cmd == 'unsubscribe' + @removeSubscription(msg.routing_key) + + auth: (auth) -> + if auth.token and auth.sessionId and signing.verify(auth.token) + @auth = auth + + addSubscription: (routing_key) -> + if @auth + if !@subscriptionManager + @subscriptionManager = new SubscriptionManager(@id, @auth, @ws) + @subscriptionManager.add(routing_key) + + close: () -> + if @subscriptionManager + @subscriptionManager.destroy() + + removeSubscription: (routing_key) -> + if @subscriptionManager + @subscriptionManager.remove(routing_key) + +exports.createClient = (ws) -> + client = new Client(ws) + clients[client.id] = client + client.ws.on 'close', (() -> + @.close() + delete clients[@id] + ).bind(client) diff --git a/clients.coffee b/clients.coffee deleted file mode 100644 index 7e0890c..0000000 --- a/clients.coffee +++ /dev/null @@ -1,36 +0,0 @@ -clients = [] -subscriptions = {} - -removeSubscription = (id, routing_key) -> - if subscriptions[routing_key] - subscriptions[routing_key] = subscriptions[routing_key].filter (client) -> client.id != id - -exports.removeById = (id) -> - clients = clients.filter (client) -> client.id != id - - Object.keys(subscriptions).forEach (routing_key) -> - removeSubscription(id, routing_key) - -exports.getBySessionId = (session_id) -> - client = clients.filter (client) -> client.auth.sessionId == session_id - - return client[0] - -exports.add = (client) -> - clients.push(client) - -exports.addSubscription = (id, routing_key) -> - if !subscriptions[routing_key] - subscriptions[routing_key] = [] - - client = clients.filter (client) -> client.id == id - - subscriptions[routing_key].push(client[0]) - -exports.removeSubscription = removeSubscription - -exports.getBySubscription = (subscription) -> - if !subscriptions[subscription] - return [] - - return subscriptions[subscription] diff --git a/config.json b/config.json index 239d686..3d2bd84 100644 --- a/config.json +++ b/config.json @@ -1,14 +1,7 @@ { "url": "amqp://guest:guest@localhost:5672", - "exchange": { - "name": "events", - "options": {} - }, - "queue": { - "name": "EventsPushBackend", - "options": {} - }, + "secret": "mysecret", "webSocketServer": { - "port": 3000 + "port": 8888 } } diff --git a/index.coffee b/index.coffee index a563008..0fc60ce 100644 --- a/index.coffee +++ b/index.coffee @@ -1,55 +1,8 @@ -amqp = require('amqp') -uuid = require('node-uuid') - config = require('./config') -clients = require('./clients') - -connection = amqp.createConnection({ url: config.url }, {defaultExchangeName: config.exchange.name}) +client = require('./client') WebSocketServer = require('ws').Server wss = new WebSocketServer(config.webSocketServer) -connection.on 'ready', () -> - connection.queue config.queue.name, config.queue.options, (q) -> - console.log 'Queue ' + q.name + ' is open' - - exc = connection.exchange config.exchange.name, config.exchange.options, (exchange) -> - console.log 'Exchange ' + exchange.name + ' is open' - - q.bind(config.exchange.name, '#') - - q.subscribe (msg, header, deliveryInfo) -> - msg = JSON.parse(msg.data.toString()) - - clientMsg = msg.data - clientMsg.routing_key = deliveryInfo.routingKey - - senderClient = clients.getBySessionId(msg.session_id) - subscriptions = clients.getBySubscription(deliveryInfo.routingKey) - - clientMsgStr = JSON.stringify(clientMsg) - - subscriptions.forEach (client) -> - #exclude sender client - if !senderClient || client.id != senderClient.id - client.ws.send clientMsgStr - wss.on 'connection', (ws) -> - clientId = uuid.v4() - - ws.on 'message', (message) -> - msg = JSON.parse(message) - - if msg.cmd == 'auth' - clients.add({ - id: clientId, - ws: ws, - auth: msg.data - }) - else if msg.cmd == 'subscribe' - clients.addSubscription(clientId, msg.routing_key) - else if msg.cmd == 'unsubscribe' - clients.removeSubscription(clientId, msg.routing_key) - - ws.on 'close', (message) -> - clients.removeById(clientId) + client.createClient(ws) diff --git a/package.json b/package.json index 370f512..f9850f9 100644 --- a/package.json +++ b/package.json @@ -24,7 +24,9 @@ "gulp-plumber": "^0.6.6" }, "dependencies": { - "amqp": "^0.2.3", + "amqplib": "^0.3.1", + "base64-url": "^1.2.1", + "bluebird": "^2.9.10", "node-uuid": "^1.4.2", "ws": "^0.7.1" } diff --git a/rabbit.coffee b/rabbit.coffee new file mode 100644 index 0000000..e2a8a33 --- /dev/null +++ b/rabbit.coffee @@ -0,0 +1,115 @@ +amqp = require('amqplib') +Promise = require('bluebird') +amqpUrl = require('./config').url + +config = { + "exchange": { + "name": "events", + "type": "topic", + "options": { + "durable": false, + "autoDelete": true + } + }, + "queue": { + "name": "" + "options": { + "autoDelete": true, + "exclusive": true + } + }, + "channel": { + noAck: true + } +} + +# Return the connection, creates the connection if it does not exist. +getConnection = do -> + connection = null + + return () -> + return new Promise (resolve, reject) -> + if (!connection) + amqp.connect(amqpUrl).then (conn) -> + connection = conn + resolve(connection) + else + resolve(connection) + +# Return the user channel +channels = do -> + chs = {} + + removeClient = (client_id) -> + get(client_id).then (channel) -> + channel.close() + + delete chs[client_id] + + get = (client_id) -> + return new Promise (resolve, reject) -> + if !chs[client_id] + getConnection() + .then (connection) -> connection.createChannel() + .then (channel) -> + chs[client_id] = channel + return resolve(chs[client_id]) + else + resolve(chs[client_id]) + + return { + removeClient: removeClient + get: get + } + +# Return a new queue +queues = do -> + getExchange = (channel) -> + return channel.assertExchange(config.exchange.name, config.exchange.type, config.exchange.options) + + getQueue = (channel, exchange) -> + return channel.assertQueue(config.queue.name, config.queue.options).then (qok) -> qok.queue + + return { + create: (channel, client_id, routing_key) -> + return getExchange(channel) + .then (exchange) -> getQueue(channel) + } + +subscriptions = do -> + subs = {} + + bindAndSubscribe = (channel, queue, routing_key, cb) -> + channel.bindQueue(queue, config.exchange.name, routing_key) + return channel.consume(queue, cb, {noAck: true}) + + registerSubscription = (client_id, routing_key, consumerTag) -> + subs[client_id] = subs[client_id] || {} + subs[client_id][routing_key] = consumerTag + + subscribe = (client_id, routing_key, cb) -> + channels.get(client_id) + .then (channel) -> + queues.create(channel) + .then (queue) -> bindAndSubscribe(channel, queue, routing_key, cb) + .then (ok) -> registerSubscription(client_id, routing_key, ok.consumerTag) + + unsubscribe = (client_id, routing_key) -> + channels.get(client_id).then (channel) -> + channel.cancel(subs[client_id][routing_key]) + + removeClient = (client_id) -> + delete subs[client_id] + + return { + subscribe: subscribe + unsubscribe: unsubscribe + removeClient: removeClient + } + +exports.destroy = (client_id) -> + subscriptions.removeClient(client_id) + channels.removeClient(client_id) + +exports.subscribe = subscriptions.subscribe +exports.unsubscribe = subscriptions.unsubscribe diff --git a/signing.coffee b/signing.coffee new file mode 100644 index 0000000..a392355 --- /dev/null +++ b/signing.coffee @@ -0,0 +1,30 @@ +crypto = require('crypto') +base64url = require('base64-url') +config = require('./config') + +salt = 'django.core.signing' + +rsplit = (token, sep, maxsplit) -> + split = token.split(sep) + + if maxsplit + return [ split.slice(0, -maxsplit).join(sep) ].concat(split.slice(-maxsplit)) + + return split + +exports.verify = (token) -> + [value, sig] = rsplit(token, ':', 1) + + shasum = crypto.createHash('sha1') + shasum.update(salt + 'signer' + config.secret) + + hmacKey = shasum.digest() + + hmac = crypto.createHmac('sha1', hmacKey) + + hmac.setEncoding('base64') + hmac.update(value) + + key = base64url.escape(hmac.digest('base64')) + + return key == sig diff --git a/subscription.coffee b/subscription.coffee new file mode 100644 index 0000000..7453579 --- /dev/null +++ b/subscription.coffee @@ -0,0 +1,46 @@ +queue = require('./rabbit') + +class Subscription + constructor: (@client_id, @auth, @ws, @routing_key) -> + + handleMessage: (msg) -> + content = JSON.parse(msg.content.toString()) + + if content.session_id == @auth.sessionId + return + + clientMsg = content + clientMsg.routing_key = msg.fields.routingKey + clientMsgStr = JSON.stringify(clientMsg) + + @ws.send clientMsgStr + + start: () -> + queue.subscribe(@client_id, @routing_key, @handleMessage.bind(@)) + + stop: () -> + queue.unsubscribe(@client_id, @routing_key) + +class SubscriptionManager + constructor: (@client_id, @auth, @ws) -> + @subscriptions = {} + + add: (routing_key) -> + if !@subscriptions[routing_key] + @subscriptions[routing_key] = {} + else + @subscriptions[routing_key].stop() + + @subscriptions[routing_key] = new Subscription(@client_id, @auth, @ws, routing_key) + @subscriptions[routing_key].start() + + remove: (routing_key) -> + @subscriptions[routing_key].stop() + + delete @subscriptions[routing_key] + + destroy: () -> + @subscriptions = {} + queue.destroy(@client_id) + +exports.SubscriptionManager = SubscriptionManager