From 8b94d6c411da74c16b9610e9f7606445c7837740 Mon Sep 17 00:00:00 2001 From: Daniel Howard Date: Fri, 20 Jun 2014 15:29:17 -0700 Subject: [PATCH] Use bi-directional events system instead of handcrafted messages. --- client/js/im.js | 315 ++++++++++++++++++++++++----------- server/app.js | 47 +++--- server/libs/packages.js | 86 ---------- server/libs/utils.js | 15 ++ server/middleware/im.js | 22 ++- server/middleware/im/hub.js | 48 +++--- server/middleware/im/user.js | 68 +++----- 7 files changed, 326 insertions(+), 275 deletions(-) diff --git a/client/js/im.js b/client/js/im.js index a6642b4..74bd942 100644 --- a/client/js/im.js +++ b/client/js/im.js @@ -47,6 +47,7 @@ AjaxIM = function(options, actions) { // requests rather than POST requests (such as how the Node.JS Ajax IM // server works). this.actions = $.extend({ + noop: this.settings.pollServer + '/app/noop', listen: this.settings.pollServer + '/app/listen', send: this.settings.pollServer + '/app/message', status: this.settings.pollServer + '/app/status', @@ -258,6 +259,13 @@ AjaxIM = function(options, actions) { self._scrollers(); } catch(e) {} }); + + // Set up event handling + this.onEvent('hello', this.onHello); + this.onEvent('message', this.onMessage); + this.onEvent('status', this.onStatus); + this.onEvent('notice', this.onNotice); + this.onEvent('goodbye', this.onGoodbye); } else { return AjaxIM.init(options); } @@ -403,66 +411,61 @@ $.extend(AjaxIM.prototype, { : Math.min(self._reconnectIn * 2, 16000); self._lastReconnect = new Date(); setTimeout(function() { self.listen(); }, self._reconnectIn); - } + }, + this.actions.noop ); }, // === //private// {{{AjaxIM.}}}**{{{_parseMessages(messages)}}}** === // _parseMessage: function(message) { + this.triggerEvent(message); + }, + + onHello: function(message) { var self = this; - $(this).trigger('parseMessage', [message]); + this._clearSession(); - switch(message.type) { - case 'hello': - this._clearSession(); + this.username = message.username; + this.current_status = ['available', '']; + store.set('user', message.username); + store.set(this.username + '-status', this.current_status); - this.username = message.username; - this.current_status = ['available', '']; - store.set('user', message.username); - store.set(this.username + '-status', this.current_status); + $('#imjs-friends').attr('class', 'imjs-available'); + $.each(message.friends, function() { + var friend; + if(this.length == 2) + friend = this; + else + friend = [this.toString(), ['offline', '']]; + self.addFriend(friend[0], friend[1], 'Friends'); + }); + store.set(this.username + '-friends', this.friends); - $('#imjs-friends').attr('class', 'imjs-available'); - $.each(message.friends, function() { - var friend; - if(this.length == 2) - friend = this; - else - friend = [this.toString(), ['offline', '']]; - self.addFriend(friend[0], friend[1], 'Friends'); - }); - store.set(this.username + '-friends', this.friends); - - // Set username in Friends list - var header = $('#imjs-friends-panel .imjs-header'); - header.html(header.html().replace('{username}', this.username)); - - // Set status available - $('#imjs-away-message-text, #imjs-away-message-text-arrow').hide(); - $('#imjs-status-panel .imjs-button').removeClass('imjs-toggled'); - $('#imjs-button-available').addClass('imjs-toggled'); - break; + // Set username in Friends list + var header = $('#imjs-friends-panel .imjs-header'); + header.html(header.html().replace('{username}', this.username)); - case 'message': - this.incoming(message.user, message.body); - break; + // Set status available + $('#imjs-away-message-text, #imjs-away-message-text-arrow').hide(); + $('#imjs-status-panel .imjs-button').removeClass('imjs-toggled'); + $('#imjs-button-available').addClass('imjs-toggled'); + }, - case 'status': - this._friendUpdate(message.user, message.status, - message.message); - this._storeFriends(); - break; + onMessage: function(event) { + this.incoming(event.from, event.body); + }, - case 'notice': - break; - - case 'goodbye': - this._notConnected(); - break; + onStatus: function(event) { + this._friendUpdate(event.from, event.status, event.message); + this._storeFriends(); + }, - default: - break; - } + onNotice: function(event) { + }, + + onGoodbye: function(event) { + this._notConnected(); }, // === {{{AjaxIM.}}}**{{{incoming(from, message)}}}** === @@ -922,23 +925,17 @@ $.extend(AjaxIM.prototype, { $(this).trigger('sendingMessage', [username, body]); - AjaxIM.post( - this.actions.send, - {to: username, body: body}, - function(result) { - if(result.type == 'success' && result.success == 'sent') { - $(self).trigger('sendMessageSuccessful', - [username, body]); + var event = {type: 'message', to: username, body: body}; + this.sendEvent(event, function(result) { + if(result._status.send) { + $(self).trigger('sendMessageSuccessful', [username, body]); } else if(result.type == 'error') { if(result.error == 'not online') - $(self).trigger('sendMessageFailed', - ['offline', username, body]); + $(self).trigger('sendMessageFailed', ['offline', username, body]); else - $(self).trigger('sendMessageFailed', - [result.error, username, body]); + $(self).trigger('sendMessageFailed', [result.error, username, body]); } - }, - function(error) { + }, function(error) { self._notConnected(); var error = self._addError( self.chats[username], @@ -946,11 +943,9 @@ $.extend(AjaxIM.prototype, { 'server is not available. Please ensure ' + 'that you are signed in and try again.'); self._store(error); - $(self).trigger('sendMessageFailed', ['not connected', username, body]); - } - ); + }); }, // === {{{AjaxIM.}}}**{{{status(s, message)}}}** === @@ -999,31 +994,27 @@ $.extend(AjaxIM.prototype, { } ); } else { - AjaxIM.post( - this.actions.status, - {status: value, message: message}, - function(result) { - switch(result.type) { - case 'success': - $(self).trigger('changeStatusSuccessful', - [value, message]); - self.current_status = [value, message]; - store.set(self.username + '-status', - self.current_status); - break; - - case 'error': - default: - $(self).trigger('changeStatusFailed', - [result.e, value, message]); - break; - } - }, - function(error) { - $(self).trigger('changeStatusFailed', - ['not connected', value, message]); - } - ); + var event = {type: 'status', status: value, message: message}; + this.sendEvent(event, function(result) { + if(result._status.send) { + $(self).trigger('sendMessageSuccessful', [username, body]); + } else if(result.type == 'error') { + if(result.error == 'not online') + $(self).trigger('sendMessageFailed', ['offline', username, body]); + else + $(self).trigger('sendMessageFailed', [result.error, username, body]); + } + }, function(error) { + self._notConnected(); + var error = self._addError( + self.chats[username], + 'You are currently not connected or the ' + + 'server is not available. Please ensure ' + + 'that you are signed in and try again.'); + self._store(error); + $(self).trigger('sendMessageFailed', + ['not connected', username, body]); + }); } }, @@ -1332,6 +1323,97 @@ $.extend(AjaxIM.prototype, { $('#imjs-scroll-left').html(hiddenLeft); $('#imjs-scroll-right').html(hiddenRight); + }, + + unconfirmedEvents: {}, + eventId: 1, + + createEvent: function() { + var event = {}; + event.id = this.eventId++;; + this.unconfirmedEvents[event.id] = evt; + }, + + sendEvent: function(event, successFunc, failureFunc) { + event.id = this.eventId++; + var evt = $.extend({}, event); + evt['_status'] = { + successFunc: successFunc, + failureFunc: failureFunc + }; + this.unconfirmedEvents[event.id] = evt; + + var self = this; + var url = null; + switch (event.type) { + case 'message': + url = this.actions.send; + break; + case 'status': + url = this.actions.status; + break; + case 'signoff': + url = this.actions.signoff; + break; + default: + break; + } + + AjaxIM.post(url, event, + function(result) { + if (result) { + for (var e=0; e < result.length; ++e) { + self.dispatchEvent(events[e]); + } + } + }, + function(error) { + if (self.unconfirmedEvents[event.id]) { + event = self.unconfirmedEvents[event.id]; + event['_status']['sent'] = false; + self.dispatchEvent(event); + } + } + ); + }, + + dispatchEvent: function(event) { + if (this.unconfirmedEvents[event.id]) { + $.extend(event, this.unconfirmedEvents[event.id]); + delete this.unconfirmedEvents[event.id]; + console.log(JSON.stringify(event)); + if (event['_status']['sent']) { + event['_status']['successFunc'](event); + } else { + event['_status']['failureFunc'](event); + } + } else { + this.triggerEvent(event); + } + }, + + // poor man's Backbone.js Events + eventHandlers: {}, + + /** + * Add a callback to listen for an event type. + */ + onEvent: function(eventType, callback) { + if (!this.eventHandlers[eventType]) { + this.eventHandlers[eventType] = []; + } + this.eventHandlers[eventType].push(callback); + }, + + /** + * Trigger an event on all interested callbacks. + */ + triggerEvent: function(event) { + if (this.eventHandlers[event.type]) { + for (var e=0; e < this.eventHandlers[event.type].length; ++e) { + this.eventHandlers[event.type][e].call(this, event); + } + } } }) @@ -1360,7 +1442,7 @@ AjaxIM.init = function(options, actions) { AjaxIM.client = new AjaxIM(options, actions); return AjaxIM.client; -} +}; // === {{{AjaxIM.}}}**{{{request(url, data, successFunc, failureFunc)}}}** === @@ -1377,20 +1459,21 @@ AjaxIM.init = function(options, actions) { // {{{_ignore_}}} is simply to provide compatability with {{{$.post}}}. // {{{failure}}} is a callback function called when a request hasn't not // completed successfully. -AjaxIM.post = function(url, data, successFunc, failureFunc) { - AjaxIM.request(url, 'POST', data, successFunc, failureFunc); +AjaxIM.post = function(url, data, successFunc, failureFunc, urlnoop) { + AjaxIM.request(url, 'POST', data, successFunc, failureFunc, urlnoop); }; -AjaxIM.get = function(url, data, successFunc, failureFunc) { - AjaxIM.request(url, 'GET', data, successFunc, failureFunc); +AjaxIM.get = function(url, data, successFunc, failureFunc, urlnoop) { + AjaxIM.request(url, 'GET', data, successFunc, failureFunc, urlnoop); }; -AjaxIM.request = function(url, type, data, successFunc, failureFunc) { +AjaxIM.request = function(url, type, data, successFunc, failureFunc, noopurl) { var errorTypes = ['timeout', 'error', 'notmodified', 'parseerror']; if(typeof failureFunc != 'function') failureFunc = function(){}; var jsonp = (url.substring(0, 1) !== '/'); + var success = false; data['sessionid'] = cookies.get('sessionid'); $.ajax({ url: url, @@ -1400,6 +1483,7 @@ AjaxIM.request = function(url, type, data, successFunc, failureFunc) { cache: false, timeout: 299000 }).done(function(data) { + success = true; _dbg(JSON.stringify(data)); successFunc(data); }).fail(function(jqXHR, textStatus) { @@ -1407,6 +1491,47 @@ AjaxIM.request = function(url, type, data, successFunc, failureFunc) { failureFunc(textStatus); }); + if (jsonp) { + setTimeout(function() { + var failfn = function() { + if (!success) { + var textStatus = 'error'; + _dbg(textStatus); + failureFunc(textStatus); + } + }; + if (noopurl) { + var noopfn = function() { + var noopdone = false; + var event = {type: 'noop'}; + $.ajax({ + url: noopurl, + data: event, + dataType: 'jsonp', + type: type, + cache: false, + timeout: 299000 + }).done(function(data) { + noopdone = true; + if (!success) { + setTimeout(noopfn, 3000); + } + }).fail(function(jqXHR, textStatus) { + // since JSONP, never called + }); + setTimeout(function() { + if (!noopdone) { + failfn(); + } + }, 3000); + }; + noopfn(); + } else { + failfn(); + } + }, 3000); + } + // This prevents Firefox from spinning indefinitely // while it waits for a response. /* @@ -1433,7 +1558,9 @@ AjaxIM.incoming = function(data) { if(data.length) AjaxIM.client._parseMessages(data); -} +}; + +AjaxIM.eventID = 1; // === {{{AjaxIM.}}}**{{{l10n}}}** === // diff --git a/server/app.js b/server/app.js index 4bad472..4fa840f 100755 --- a/server/app.js +++ b/server/app.js @@ -37,14 +37,13 @@ app.listen(APP_PORT, APP_HOST); app.get('/app/listen', function(){}); app.use('/app/message', function(req, res) { - res.find(req.param('to'), function(user) { - if(!user) - return res.send(new packages.Error('not online')); - - res.message(user, new packages.Message( - req.session.data('username'), - req.param('body') - )); + res.find(req.param('to'), function(to) { + if(to) { + res.message(to, req.event); + } else { + req.event._status = {sent: false, e: 'not online'}; + res.jsonp(req.event); + } }); }); @@ -52,33 +51,37 @@ app.use('/app/message/typing', function(req, res) { if(~packages.TYPING_STATES.indexOf('typing' + req.param('state'))) { res.find(req.param('to'), function(user) { if(user) { - res.message(user, new packages.Status( - req.session.data('username'), - 'typing' + req.param('state') - )); + req.event.status = 'typing' + req.param('state'); + res.message(user, req.event); + } else { + // Typing updates do not receive confirmations, + // as they are not important enough. + req.event._status = {sent: false, e: 'invalid user'}; + res.jsonp(req.event); } - - // Typing updates do not receive confirmations, - // as they are not important enough. - res.send(''); }); } else { - res.send(new packages.Error('invalid state')); + req.event._status = {sent: false, e: 'invalid state'}; + res.jsonp(req.event); } }); app.use('/app/status', function(req, res) { if(~packages.STATUSES.indexOf(req.param('status'))) { - res.status(req.param('status'), req.param('message')); - res.send(new packages.Success('status updated')); + res.status(req.event); } else { - res.send(new packages.Error('invalid status')); + req.event._status = {sent: false, e: 'invalid status'}; + res.jsonp(req.event); } }); +app.use('/app/noop', function(req, res) { + req.event._status = {sent: true}; + res.session.respond(res, req.event); +}); + app.use('/app/signoff', function(req, res) { - res.signOff(); - res.send(new packages.Success('goodbye')); + res.signOff(req.event); }); console.log('Ajax IM server started...'); diff --git a/server/libs/packages.js b/server/libs/packages.js index 63eb12b..6f297ac 100644 --- a/server/libs/packages.js +++ b/server/libs/packages.js @@ -1,88 +1,2 @@ -var sys = require('sys'); - -var Package = function() {}; -Package.prototype._sanitize = function(content) { - // strip HTML - return content.replace(/<(.|\n)*?>/g, ''); -}; - -var Error = exports.Error = function(error) { - this.error = error; -}; -sys.inherits(Error, Package); -Error.prototype.toJSON = function() { - return { - type: 'error', - error: this.error - }; -}; - -var Success = exports.Success = function(success) { - this.success = success; -}; -sys.inherits(Success, Package); -Success.prototype.toJSON = function() { - return { - type: 'success', - success: this.success - }; -}; - -var Message = exports.Message = function(from, body) { - this.from = from; - this.body = body; -}; -sys.inherits(Message, Package); -Message.prototype.toJSON = function() { - return { - type: 'message', - user: this.from, - body: this._sanitize(this.body) - }; -}; - -var Notice = exports.Notice = function(username, info) { - this.username = username; - this.info = info; -}; -sys.inherits(Notice, Package); -Notice.prototype.toJSON = function() { - return { - type: 'notice', - user: this.username, - info: this.info - }; -}; - exports.TYPING_STATES = ['typing+', 'typing~', 'typing-']; exports.STATUSES = ['available', 'away', 'idle']; -var Status = exports.Status = function(username, status, message) { - var statuses = exports.STATUSES + exports.TYPING_STATES; - - this.username = username; - this.status = -~statuses.indexOf(status) ? status : statuses[0]; - this.message = message; -}; -sys.inherits(Status, Package); -Status.prototype.toJSON = function() { - return { - type: 'status', - user: this.username, - status: this.status, - message: this._sanitize(this.message || '') - }; -}; - -var Offline = exports.Offline = function(username) { - this.username = username; -}; -sys.inherits(Offline, Package); -Offline.prototype.toJSON = function() { - // A special type of status - return { - type: 'status', - user: this.username, - status: 'offline', - message: '' - }; -}; diff --git a/server/libs/utils.js b/server/libs/utils.js index f7d8608..77b066a 100644 --- a/server/libs/utils.js +++ b/server/libs/utils.js @@ -9,6 +9,21 @@ module.exports = o_ = { a[keys[i]] = b[keys[i]]; return a; }, + + extend: function() { + var o = {}; + for (var a=0; a < arguments.length; ++a) { + this.merge(o, arguments[a]); + } + return o; + }, + + deletekey: function(o, key) { + if (o.hasOwnProperty(key)) { + delete o[key]; + } + return o; + }, values: function(obj) { if(typeof obj == 'array') diff --git a/server/middleware/im.js b/server/middleware/im.js index d928974..39f123f 100644 --- a/server/middleware/im.js +++ b/server/middleware/im.js @@ -1,5 +1,6 @@ var url = require('url'), - Hub = require('./im/hub'); + Hub = require('./im/hub'), + o_ = require('../libs/utils'); module.exports = function setupHub(options) { options = options || {}; @@ -59,18 +60,25 @@ module.exports = function setupHub(options) { if(msg = sess.message_queue.shift()) sess._send.apply(sess, msg); } else { - sess.connection = res; + req.event = o_.extend({}, req.query, req.body, req.params); + o_.deletekey(req.event, 'callback'); + o_.deletekey(req.event, 'sessionid'); + o_.deletekey(req.event, '_'); + req.event.from = sess.data('username'); } req.session = sess; + res.session = sess; res.find = store.find.bind(store); - res.message = function(to, package) { - store.message(req.session, to, package); + res.message = function(to, event) { + store.message(res, to, event); }; - res.status = function(value, message) { - req.session.status(value, message); + res.status = function(event) { + req.session.status(res, event); + }; + res.signOff = function(event) { + store.signOff(req.sessionID, res, event); }; - res.signOff = function() { store.signOff(req.sessionID); }; if(url.parse(req.url).pathname !== '/app/listen') { next(); diff --git a/server/middleware/im/hub.js b/server/middleware/im/hub.js index bbe2003..d535c15 100644 --- a/server/middleware/im/hub.js +++ b/server/middleware/im/hub.js @@ -18,15 +18,14 @@ var Hub = module.exports = function Hub(options) { }, this.reapInterval, this); } - this.events.addListener('update', o_.bind(function(package) { - var _package = package.toJSON(); - if(package.type == 'status' && package.status == 'offline') { + this.events.addListener('update', o_.bind(function(event) { + if(event.type == 'status' && event.status == 'offline') { var sids = Object.keys(this.sessions), sid, sess; for(sid in this.sessions) { sess = this.sessions[sid]; - if(sess.data('username') == package.username) { + if(sess.data('username') == event.from) { if(sess.listeners.length) - sess.send(200, {type: 'goodbye'}); + sess.send({type: 'goodbye'}); delete this.sessions[sid]; break; } @@ -45,7 +44,8 @@ Hub.prototype.reap = function(ms) { for(var i = 0, len = sids.length; i < len; ++i) { var sid = sids[i], sess = this.sessions[sid]; if(sess.lastAccess < threshold) { - this.events.emit('update', new packages.Offline(sess.data('username'))); + var event = {type: 'status', from: sess.data('username'), status: 'offline', message: ''}; + this.events.emit('update', event); } } }; @@ -75,19 +75,15 @@ Hub.prototype.get = function(req, fn) { session._friends(friends_copy); session.events.addListener('status', o_.bind(function(value, message) { - this.events.emit( - 'update', - new packages.Status(session.data('username'), - value, - message) - ); + var event = {type: 'status', from: session.data('username'), status: value, message: message}; + this.events.emit('update', event); }, this)); this.events.addListener('update', o_.bind(session.receivedUpdate, session)); this.set(req.sessionID, session); fn(null, session); }, this)); - session.status(packages.STATUSES[0], ''); + session.status(null, {status: packages.STATUSES[0], message: ''}); } else { fn(); } @@ -112,19 +108,25 @@ Hub.prototype.find = function(username, fn) { fn(false); }; -Hub.prototype.message = function(from, to, package) { +Hub.prototype.message = function(res, to, event) { try { - package.user = from; - to.send(package); - from.respond(new packages.Success('sent')); + to.send(event); + event._status = {sent: true}; + res.session.respond(res, event); } catch(e) { - from.respond(new packages.Error(e.description)); + event._status = {sent: false, e: e.description}; + res.session.respond(res, event); } }; -Hub.prototype.signOff = function(sid) { - if(sid in this.sessions) - this.events.emit('update', - new packages.Offline( - this.sessions[sid].data('username'))); +Hub.prototype.signOff = function(sid, res, event) { + if (sid in this.sessions) { + event.status = 'offline'; + event.message = ''; + this.events.emit('update', event); + } + event._status = {sent: true}; + if (res) { + res.session.respond(res, event); + } }; diff --git a/server/middleware/im/user.js b/server/middleware/im/user.js index adf97c7..6d228f7 100644 --- a/server/middleware/im/user.js +++ b/server/middleware/im/user.js @@ -18,18 +18,20 @@ var User = module.exports = function(req, data) { setInterval(o_.bind(this._expireConns, this), 500); }; -User.prototype.receivedUpdate = function(package) { - if(this.friends.indexOf(package.username)) - this.send(package); +User.prototype.receivedUpdate = function(event) { + event = o_.extend({}, event); + event.to = this.data('username'); + if(this.friends.indexOf(event.from)) + this.send(event); }; User.prototype._friends = function(friends) { this.friends = friends; - this.send(JSON.stringify({ + this.send({ type: 'hello', username: this.data('username'), friends: friends - })); + }); }; User.prototype._expireConns = function() { @@ -54,52 +56,28 @@ User.prototype.listener = function(conn) { this.listeners.push(conn); }; -User.prototype.respond = function(code, message, callback) { - this._send(this.req.jsonpCallback? 'listener': 'connection', code, message, callback); +User.prototype.respond = function(res, event) { + this._send('connection', event, res); }; -User.prototype.send = function(code, message, callback) { - this._send('listener', code, message, callback); +User.prototype.send = function(event) { + this._send('listener', event); }; -User.prototype.addCallback = function(message) { - return ((typeof this.req.jsonpCallback) != 'undefined')? this.req.jsonpCallback+'('+message+');': message; -}; - -User.prototype._send = function(type, code, message, callback) { - if(!message && typeof code != 'number') { - callback = message; - message = code; - code = 200; - } - - if(typeof message != 'string') - message = JSON.stringify(message); - - if(type == 'connection' && this.connection) { +User.prototype._send = function(type, event, res) { + if(type == 'connection') { // end a regular connection with a response - this.connection.writeHead(code || 200, { -// 'Content-Type': 'application/json', - 'Content-Type': 'application/javascript', - 'Content-Length': this.addCallback(message).length - }); - this.connection.end(this.addCallback(message)); + res.jsonp(event); } else { - // add a message to a long-polling connection + // end a long-polling connection with an event if(!this.listeners.length) return this.message_queue.push(arguments); var cx = this.listeners.slice(), conn; this.listeners = []; while(conn = cx.shift()) { - conn.writeHead(code || 200, { -// 'Content-Type': 'application/json', - 'Content-Type': 'application/javascript', - 'Content-Length': this.addCallback(message).length - }); - conn.end(this.addCallback(message)); + conn.jsonp(event); } - if(callback) callback(); } }; @@ -114,11 +92,15 @@ User.prototype.touch = function() { this.lastAccess = +new Date; }; -User.prototype.status = function(value, message) { - if(!value) +User.prototype.status = function(res, event) { + if(!event) return this._status; - this._status = value; - this._status_message = message; - this.events.emit('status', value, message); + this._status = event.status; + this._status_message = event.message; + this.events.emit('status', event.status, event.message); + event._status = {sent: true}; + if (res) { + this.respond(res, event); + } };