1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 | var debug = require('debug')('antisocial-friends:activity');
|
10 | var VError = require('verror').VError;
|
11 | var async = require('async');
|
12 | var IO = require('socket.io');
|
13 | var IOAuth = require('socketio-auth');
|
14 | var cryptography = require('antisocial-encryption');
|
15 | var moment = require('moment');
|
16 |
|
17 | module.exports = function activityFeedMount(antisocialApp, expressListener) {
|
18 | var config = antisocialApp.config;
|
19 | var db = antisocialApp.db;
|
20 | var authUserMiddleware = antisocialApp.authUserMiddleware;
|
21 |
|
22 | debug('mounting ws /antisocial-activity');
|
23 |
|
24 | if (!antisocialApp.openActivityListeners) {
|
25 | antisocialApp.openActivityListeners = {};
|
26 | }
|
27 |
|
28 | antisocialApp.ioActivity = IO(expressListener, {
|
29 | 'path': '/antisocial-activity'
|
30 | });
|
31 |
|
32 | antisocialApp.ioActivity.on('connect', function (soc) {
|
33 | debug('got connect', soc.id);
|
34 | soc.on('disconnect', function (e) {
|
35 | debug('got disconnect %s %s', soc.id, e);
|
36 | });
|
37 | soc.on('error', function (e) {
|
38 | debug('got error %s %s', soc.id, e);
|
39 | });
|
40 | });
|
41 |
|
42 |
|
43 |
|
44 |
|
45 |
|
46 | IOAuth(antisocialApp.ioActivity, {
|
47 | 'timeout': 60000,
|
48 | 'authenticate': function (socket, data, callback) {
|
49 | debug('authenticate %s', socket.id);
|
50 |
|
51 | if (!data.friendAccessToken) {
|
52 | callback(new VError('friendAccessToken not supplied'), false);
|
53 | }
|
54 | if (!data.username) {
|
55 | callback(new VError('username not supplied'), false);
|
56 | }
|
57 |
|
58 | async.waterfall([
|
59 | function getUser(cb) {
|
60 | db.getInstances('users', [{
|
61 | 'property': 'username',
|
62 | 'value': data.username
|
63 | }], function (err, userInstances) {
|
64 | if (err) {
|
65 | return cb(new VError(err, 'user not found'));
|
66 | }
|
67 |
|
68 | if (userInstances.length > 1) {
|
69 | return cb(new VError('more than one user matching username'));
|
70 | }
|
71 |
|
72 | if (!userInstances.length) {
|
73 | return cb(new VError('user not found'));
|
74 | }
|
75 |
|
76 | var user = userInstances[0];
|
77 |
|
78 | cb(err, user);
|
79 | });
|
80 | },
|
81 | function findFriend(user, cb) {
|
82 | db.getInstances('friends', [{
|
83 | 'property': 'userId',
|
84 | 'value': user.id
|
85 | }, {
|
86 | 'property': 'localAccessToken',
|
87 | 'value': data.friendAccessToken
|
88 | }], function (err, friendInstances) {
|
89 | if (err) {
|
90 | return cb(new VError(err, 'error reading friends'));
|
91 | }
|
92 |
|
93 | if (!friendInstances.length) {
|
94 | return cb(new VError(err, 'friend not found'));
|
95 | }
|
96 |
|
97 | cb(err, user, friendInstances[0]);
|
98 | });
|
99 | }
|
100 | ], function (err, user, friend) {
|
101 | if (err) {
|
102 | debug('authenticate error %s', err.message);
|
103 | return callback(err);
|
104 | }
|
105 | if (friend.status !== 'accepted') {
|
106 | return callback(new VError(err, 'friend not accepted'), false);
|
107 | }
|
108 | data.friend = friend;
|
109 | data.user = user;
|
110 |
|
111 | var key = data.user.username + '<-' + data.friend.remoteEndPoin;
|
112 |
|
113 | if (antisocialApp.openActivityListeners[key]) {
|
114 | debug('activityFeedSubscribeConnect abort already connected %s', key);
|
115 | return callback(new VError(err, 'already connected ' + key), false);
|
116 | }
|
117 |
|
118 | antisocialApp.openActivityListeners[key] = socket;
|
119 |
|
120 | callback(null, true);
|
121 | });
|
122 | },
|
123 | 'postAuthenticate': function (socket, data) {
|
124 | debug('postAuthenticate %s', socket.id);
|
125 |
|
126 | socket.antisocial = {
|
127 | 'friend': data.friend,
|
128 | 'user': data.user,
|
129 | 'key': data.user.username + '<-' + data.friend.remoteEndPoint,
|
130 | 'setDataHandler': function setDataHandler(handler) {
|
131 | socket.antisocial.dataHandler = handler;
|
132 | }
|
133 | };
|
134 |
|
135 | debug('connection established %s %s', socket.id, socket.antisocial.key);
|
136 |
|
137 | antisocialApp.emit('open-activity-connection', {
|
138 | 'info': socket.antisocial,
|
139 | 'socket': socket
|
140 | });
|
141 |
|
142 | socket.on('highwater', function (highwater) {
|
143 | debug('got highwater from %s %s', socket.id, socket.antisocial.key, highwater);
|
144 | antisocialApp.emit('activity-backfill', {
|
145 | 'info': socket.antisocial,
|
146 | 'socket': socket,
|
147 | 'highwater': highwater
|
148 | });
|
149 | });
|
150 |
|
151 | socket.on('data', function (data) {
|
152 | debug('got data from %s %s', socket.id, socket.antisocial.key);
|
153 |
|
154 | var decrypted = cryptography.decrypt(socket.antisocial.friend.remotePublicKey, socket.antisocial.friend.keys.private, data);
|
155 | if (!decrypted.valid) {
|
156 | debug('decryption signature validation error:', decrypted.invalidReason);
|
157 | return;
|
158 | }
|
159 |
|
160 | data = decrypted.data;
|
161 |
|
162 | if (!decrypted.contentType || decrypted.contentType === 'application/json') {
|
163 | try {
|
164 | data = JSON.parse(decrypted.data);
|
165 | }
|
166 | catch (e) {
|
167 | data = '';
|
168 | }
|
169 | }
|
170 |
|
171 | if (socket.antisocial.dataHandler) {
|
172 | socket.antisocial.dataHandler(data);
|
173 | }
|
174 | else {
|
175 | debug('no data handler for %s', socket.antisocial.key);
|
176 | }
|
177 | });
|
178 |
|
179 | socket.on('disconnect', function (reason) {
|
180 | debug('got disconnect %s %s %s', socket.id, socket.antisocial.key, reason);
|
181 | antisocialApp.emit('close-activity-connection', {
|
182 | 'info': socket.antisocial,
|
183 | 'reason': reason
|
184 | });
|
185 | db.updateInstance('friends', socket.antisocial.friend.id, {
|
186 | 'online': false
|
187 | });
|
188 | delete antisocialApp.openActivityListeners[socket.antisocial.key];
|
189 | });
|
190 |
|
191 | socket.emit('highwater', socket.antisocial.friend.highWater ? socket.antisocial.friend.highWater : moment().subtract(7, 'd').toISOString());
|
192 |
|
193 | db.updateInstance('friends', socket.antisocial.friend.id, {
|
194 | 'online': true
|
195 | });
|
196 | }
|
197 | });
|
198 | };
|