1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 | var debug = require('debug')('antisocial-friends:activity');
|
10 | var VError = require('verror').VError;
|
11 | var async = require('async');
|
12 | var IO = require('socket.io');
|
13 | var IOAuth = require('socketio-auth');
|
14 | var cryptography = require('antisocial-encryption');
|
15 | var refresh = require('../lib/utilities').refresh;
|
16 |
|
17 | module.exports = function activityFeedMount(antisocialApp, expressListener) {
|
18 | var db = antisocialApp.db;
|
19 |
|
20 | debug('mounting ws /antisocial-activity');
|
21 |
|
22 | if (!antisocialApp.openActivityListeners) {
|
23 | antisocialApp.openActivityListeners = {};
|
24 | }
|
25 |
|
26 | antisocialApp.ioActivity = IO(expressListener, {
|
27 | 'path': '/antisocial-activity'
|
28 | });
|
29 |
|
30 | antisocialApp.ioActivity.on('connect', function (soc) {
|
31 | debug('%s /antisocial-activity connect', soc.id);
|
32 | soc.on('disconnect', function (e) {
|
33 | debug('%s /antisocial-activity disconnect %s', soc.id, e);
|
34 | });
|
35 | soc.on('error', function (e) {
|
36 | debug('%s /antisocial-activity error %s', soc.id, e);
|
37 | });
|
38 | });
|
39 |
|
40 |
|
41 |
|
42 |
|
43 |
|
44 | IOAuth(antisocialApp.ioActivity, {
|
45 | 'timeout': 60000,
|
46 | 'authenticate': function (socket, data, callback) {
|
47 | debug('%s /antisocial-activity authenticate', socket.id);
|
48 |
|
49 | if (!data.friendAccessToken) {
|
50 | callback(new VError('friendAccessToken not supplied'), false);
|
51 | }
|
52 | if (!data.username) {
|
53 | callback(new VError('username not supplied'), false);
|
54 | }
|
55 |
|
56 | async.waterfall([
|
57 | function getUser(cb) {
|
58 | db.getInstances('users', [{
|
59 | 'property': 'username',
|
60 | 'value': data.username
|
61 | }], function (err, userInstances) {
|
62 | if (err) {
|
63 | return cb(new VError(err, 'user not found'));
|
64 | }
|
65 |
|
66 | if (userInstances.length > 1) {
|
67 | return cb(new VError('more than one user matching username'));
|
68 | }
|
69 |
|
70 | if (!userInstances.length) {
|
71 | return cb(new VError('user not found'));
|
72 | }
|
73 |
|
74 | var user = userInstances[0];
|
75 |
|
76 | cb(err, user);
|
77 | });
|
78 | },
|
79 | function findFriend(user, cb) {
|
80 | db.getInstances('friends', [{
|
81 | 'property': 'userId',
|
82 | 'value': user.id
|
83 | }, {
|
84 | 'property': 'localAccessToken',
|
85 | 'value': data.friendAccessToken
|
86 | }], function (err, friendInstances) {
|
87 | if (err) {
|
88 | return cb(new VError(err, 'error reading friends'));
|
89 | }
|
90 |
|
91 | if (!friendInstances.length) {
|
92 | return cb(new VError(err, 'friend not found'));
|
93 | }
|
94 |
|
95 | cb(err, user, friendInstances[0]);
|
96 | });
|
97 | }
|
98 | ], function (err, user, friend) {
|
99 | if (err) {
|
100 | debug('%s /antisocial-activity authenticate error %s', socket.id, err.message);
|
101 | return callback(err);
|
102 | }
|
103 | if (friend.status !== 'accepted') {
|
104 | return callback(new VError(err, 'friend not accepted'), false);
|
105 | }
|
106 | data.friend = friend;
|
107 | data.user = user;
|
108 |
|
109 | var key = data.user.username + '<-' + data.friend.remoteEndPoint;
|
110 |
|
111 | if (antisocialApp.openActivityListeners[key]) {
|
112 | debug('%s /antisocial-activity abort already connected %s', socket.id, key);
|
113 | return callback(new VError(err, 'already connected ' + key), false);
|
114 | }
|
115 |
|
116 | antisocialApp.openActivityListeners[key] = socket;
|
117 |
|
118 | callback(null, true);
|
119 | });
|
120 | },
|
121 | 'postAuthenticate': function (socket, data) {
|
122 | debug('%s /antisocial-activity postAuthenticate', socket.id);
|
123 |
|
124 | socket.antisocial = {
|
125 | 'friend': data.friend,
|
126 | 'user': data.user,
|
127 | 'key': data.user.username + '<-' + data.friend.remoteEndPoint
|
128 | };
|
129 |
|
130 | debug('%s /antisocial-activity connection established %s', socket.id, socket.antisocial.key);
|
131 |
|
132 | socket.antisocial.emitter = function (appId, eventType, data) {
|
133 | refresh(antisocialApp, socket, function (err) {
|
134 | if (err) {
|
135 | console.log('emitter socket data refresh error', err.message, err.cause().message);
|
136 | return;
|
137 | }
|
138 | var message = {
|
139 | 'appId': appId,
|
140 | 'data': data
|
141 | };
|
142 |
|
143 | debug('emitter (feed mount)', eventType, message);
|
144 | message = cryptography.encrypt(socket.antisocial.friend.remotePublicKey, socket.antisocial.friend.keys.private, JSON.stringify(message));
|
145 | socket.emit(eventType, message);
|
146 | });
|
147 | };
|
148 |
|
149 | socket.on('highwater', function (data) {
|
150 | refresh(antisocialApp, socket, function (err) {
|
151 | if (err) {
|
152 | console.log('highwater event socket data refresh error', err.message, err.cause().message);
|
153 | return;
|
154 | }
|
155 |
|
156 | var decrypted = cryptography.decrypt(socket.antisocial.friend.remotePublicKey, socket.antisocial.friend.keys.private, data);
|
157 | if (!decrypted.valid) {
|
158 | console.log('WatchNewsFeedItem decryption signature validation error:', decrypted.invalidReason);
|
159 | return;
|
160 | }
|
161 |
|
162 | data = decrypted.data;
|
163 | debug('%s /antisocial-activity got highwater from %s %s', socket.id, socket.antisocial.key, data);
|
164 |
|
165 | if (!decrypted.contentType || decrypted.contentType === 'application/json') {
|
166 | try {
|
167 | data = JSON.parse(decrypted.data);
|
168 | }
|
169 | catch (e) {
|
170 | data = decrypted.data;
|
171 | }
|
172 | }
|
173 |
|
174 | var appid = data.appId;
|
175 | antisocialApp.emit('activity-backfill-' + appid, socket.antisocial.user, socket.antisocial.friend, data.data, socket.antisocial.emitter);
|
176 | });
|
177 | });
|
178 |
|
179 | socket.on('data', function (data) {
|
180 | refresh(antisocialApp, socket, function (err) {
|
181 | if (err) {
|
182 | console.log('data event socket data refresh error', err.message, err.cause().message);
|
183 | return;
|
184 | }
|
185 |
|
186 | debug('%s /antisocial-activity data from %s', socket.id, socket.antisocial.key);
|
187 |
|
188 | var decrypted = cryptography.decrypt(socket.antisocial.friend.remotePublicKey, socket.antisocial.friend.keys.private, data);
|
189 | if (!decrypted.valid) {
|
190 | debug('%s /antisocial-activity decryption signature validation error:', socket.id, decrypted.invalidReason);
|
191 | return;
|
192 | }
|
193 |
|
194 | data = decrypted.data;
|
195 |
|
196 | if (!decrypted.contentType || decrypted.contentType === 'application/json') {
|
197 | try {
|
198 | data = JSON.parse(decrypted.data);
|
199 | }
|
200 | catch (e) {
|
201 | data = '';
|
202 | }
|
203 | }
|
204 |
|
205 | var appid = data.appId;
|
206 | debug('%s /antisocial-activity emitting activity-data-' + appid, socket.id);
|
207 | antisocialApp.emit('activity-data-' + appid, socket.antisocial.user, socket.antisocial.friend, data.data);
|
208 | });
|
209 | });
|
210 |
|
211 | socket.on('disconnect', function (reason) {
|
212 | refresh(antisocialApp, socket, function (err) {
|
213 | if (err) {
|
214 | console.log('disconnect event socket data refresh error', err.message, err.cause().message);
|
215 | return;
|
216 | }
|
217 |
|
218 | debug('%s /antisocial-activity disconnect %s %s', socket.id, socket.antisocial.key, reason);
|
219 | antisocialApp.emit('close-activity-connection', socket.antisocial.user, socket.antisocial.friend, reason, socket.antisocial);
|
220 | db.updateInstance('friends', socket.antisocial.friend.id, {
|
221 | 'online': false
|
222 | });
|
223 | delete antisocialApp.openActivityListeners[socket.antisocial.key];
|
224 | });
|
225 | });
|
226 |
|
227 | antisocialApp.emit('open-activity-connection', socket.antisocial.user, socket.antisocial.friend, socket.antisocial.emitter, socket.antisocial);
|
228 |
|
229 | db.updateInstance('friends', socket.antisocial.friend.id, {
|
230 | 'online': true
|
231 | });
|
232 | }
|
233 | });
|
234 | };
|