1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 | var debug = require('debug')('antisocial-friends:activity');
|
10 | var VError = require('verror').VError;
|
11 | var async = require('async');
|
12 | var IO = require('socket.io');
|
13 | var IOAuth = require('socketio-auth');
|
14 | var cryptography = require('antisocial-encryption');
|
15 |
|
16 | module.exports = function activityFeedMount(antisocialApp, expressListener) {
|
17 | var db = antisocialApp.db;
|
18 |
|
19 | debug('mounting ws /antisocial-activity');
|
20 |
|
21 | if (!antisocialApp.openActivityListeners) {
|
22 | antisocialApp.openActivityListeners = {};
|
23 | }
|
24 |
|
25 | antisocialApp.ioActivity = IO(expressListener, {
|
26 | 'path': '/antisocial-activity'
|
27 | });
|
28 |
|
29 | antisocialApp.ioActivity.on('connect', function (soc) {
|
30 | debug('%s /antisocial-activity connect', soc.id);
|
31 | soc.on('disconnect', function (e) {
|
32 | debug('%s /antisocial-activity disconnect %s', soc.id, e);
|
33 | });
|
34 | soc.on('error', function (e) {
|
35 | debug('%s /antisocial-activity error %s', soc.id, e);
|
36 | });
|
37 | });
|
38 |
|
39 |
|
40 |
|
41 |
|
42 |
|
43 | IOAuth(antisocialApp.ioActivity, {
|
44 | 'timeout': 60000,
|
45 | 'authenticate': function (socket, data, callback) {
|
46 | debug('%s /antisocial-activity authenticate', socket.id);
|
47 |
|
48 | if (!data.friendAccessToken) {
|
49 | callback(new VError('friendAccessToken not supplied'), false);
|
50 | }
|
51 | if (!data.username) {
|
52 | callback(new VError('username not supplied'), false);
|
53 | }
|
54 |
|
55 | async.waterfall([
|
56 | function getUser(cb) {
|
57 | db.getInstances('users', [{
|
58 | 'property': 'username',
|
59 | 'value': data.username
|
60 | }], function (err, userInstances) {
|
61 | if (err) {
|
62 | return cb(new VError(err, 'user not found'));
|
63 | }
|
64 |
|
65 | if (userInstances.length > 1) {
|
66 | return cb(new VError('more than one user matching username'));
|
67 | }
|
68 |
|
69 | if (!userInstances.length) {
|
70 | return cb(new VError('user not found'));
|
71 | }
|
72 |
|
73 | var user = userInstances[0];
|
74 |
|
75 | cb(err, user);
|
76 | });
|
77 | },
|
78 | function findFriend(user, cb) {
|
79 | db.getInstances('friends', [{
|
80 | 'property': 'userId',
|
81 | 'value': user.id
|
82 | }, {
|
83 | 'property': 'localAccessToken',
|
84 | 'value': data.friendAccessToken
|
85 | }], function (err, friendInstances) {
|
86 | if (err) {
|
87 | return cb(new VError(err, 'error reading friends'));
|
88 | }
|
89 |
|
90 | if (!friendInstances.length) {
|
91 | return cb(new VError(err, 'friend not found'));
|
92 | }
|
93 |
|
94 | cb(err, user, friendInstances[0]);
|
95 | });
|
96 | }
|
97 | ], function (err, user, friend) {
|
98 | if (err) {
|
99 | debug('%s /antisocial-activity authenticate error %s', socket.id, err.message);
|
100 | return callback(err);
|
101 | }
|
102 | if (friend.status !== 'accepted') {
|
103 | return callback(new VError(err, 'friend not accepted'), false);
|
104 | }
|
105 | data.friend = friend;
|
106 | data.user = user;
|
107 |
|
108 | var key = data.user.username + '<-' + data.friend.remoteEndPoint;
|
109 |
|
110 | if (antisocialApp.openActivityListeners[key]) {
|
111 | debug('%s /antisocial-activity abort already connected %s', socket.id, key);
|
112 | return callback(new VError(err, 'already connected ' + key), false);
|
113 | }
|
114 |
|
115 | antisocialApp.openActivityListeners[key] = socket;
|
116 |
|
117 | callback(null, true);
|
118 | });
|
119 | },
|
120 | 'postAuthenticate': function (socket, data) {
|
121 | debug('%s /antisocial-activity postAuthenticate', socket.id);
|
122 |
|
123 | socket.antisocial = {
|
124 | 'friend': data.friend,
|
125 | 'user': data.user,
|
126 | 'key': data.user.username + '<-' + data.friend.remoteEndPoint
|
127 | };
|
128 |
|
129 | debug('%s /antisocial-activity connection established %s', socket.id, socket.antisocial.key);
|
130 |
|
131 | socket.antisocial.emitter = function (appId, eventType, data) {
|
132 | var message = {
|
133 | 'appId': appId,
|
134 | 'data': data
|
135 | };
|
136 |
|
137 | debug('emitter (feed mount)', eventType, message);
|
138 | message = cryptography.encrypt(socket.antisocial.friend.remotePublicKey, socket.antisocial.friend.keys.private, JSON.stringify(message));
|
139 | socket.emit(eventType, message);
|
140 | };
|
141 |
|
142 | socket.on('highwater', function (data) {
|
143 | var decrypted = cryptography.decrypt(socket.antisocial.friend.remotePublicKey, socket.antisocial.friend.keys.private, data);
|
144 | if (!decrypted.valid) {
|
145 | console.log('WatchNewsFeedItem decryption signature validation error:', decrypted.invalidReason);
|
146 | return;
|
147 | }
|
148 |
|
149 | data = decrypted.data;
|
150 | debug('%s /antisocial-activity got highwater from %s %s', socket.id, socket.antisocial.key, data);
|
151 |
|
152 | if (!decrypted.contentType || decrypted.contentType === 'application/json') {
|
153 | try {
|
154 | data = JSON.parse(decrypted.data);
|
155 | }
|
156 | catch (e) {
|
157 | data = decrypted.data;
|
158 | }
|
159 | }
|
160 |
|
161 | var appid = data.appId;
|
162 | antisocialApp.emit('activity-backfill-' + appid, socket.antisocial.user, socket.antisocial.friend, data.data, socket.antisocial.emitter);
|
163 | });
|
164 |
|
165 | socket.on('data', function (data) {
|
166 | debug('%s /antisocial-activity data from %s', socket.id, socket.antisocial.key);
|
167 |
|
168 | var decrypted = cryptography.decrypt(socket.antisocial.friend.remotePublicKey, socket.antisocial.friend.keys.private, data);
|
169 | if (!decrypted.valid) {
|
170 | debug('%s /antisocial-activity decryption signature validation error:', socket.id, decrypted.invalidReason);
|
171 | return;
|
172 | }
|
173 |
|
174 | data = decrypted.data;
|
175 |
|
176 | if (!decrypted.contentType || decrypted.contentType === 'application/json') {
|
177 | try {
|
178 | data = JSON.parse(decrypted.data);
|
179 | }
|
180 | catch (e) {
|
181 | data = '';
|
182 | }
|
183 | }
|
184 |
|
185 | var appid = data.appId;
|
186 | debug('%s /antisocial-activity emitting activity-data-' + appid, socket.id);
|
187 | antisocialApp.emit('activity-data-' + appid, socket.antisocial.user, socket.antisocial.friend, data.data);
|
188 | });
|
189 |
|
190 | socket.on('disconnect', function (reason) {
|
191 | debug('%s /antisocial-activity disconnect %s %s', socket.id, socket.antisocial.key, reason);
|
192 | antisocialApp.emit('close-activity-connection', socket.antisocial.user, socket.antisocial.friend, reason, socket.antisocial);
|
193 | db.updateInstance('friends', socket.antisocial.friend.id, {
|
194 | 'online': false
|
195 | });
|
196 | delete antisocialApp.openActivityListeners[socket.antisocial.key];
|
197 | });
|
198 |
|
199 | antisocialApp.emit('open-activity-connection', socket.antisocial.user, socket.antisocial.friend, socket.antisocial.emitter, socket.antisocial);
|
200 |
|
201 | db.updateInstance('friends', socket.antisocial.friend.id, {
|
202 | 'online': true
|
203 | });
|
204 | }
|
205 | });
|
206 | };
|