UNPKG

7.49 kBJavaScriptView Raw
1// Copyright Michael Rhodes. 2017,2018. All Rights Reserved.
2// This file is licensed under the MIT License.
3// License text available at https://opensource.org/licenses/MIT
4
5/*
6 mount socket.io listener for incoming activity connections (server to server friends)
7*/
8
9var debug = require('debug')('antisocial-friends:activity');
10var VError = require('verror').VError;
11var async = require('async');
12var IO = require('socket.io');
13var IOAuth = require('socketio-auth');
14var cryptography = require('antisocial-encryption');
15var refresh = require('../lib/utilities').refresh;
16
17module.exports = function activityFeedMount(antisocialApp, expressListener) {
18 var db = antisocialApp.db;
19
20 debug('mounting ws /antisocial-activity');
21
22 if (!antisocialApp.openActivityListeners) {
23 antisocialApp.openActivityListeners = {};
24 }
25
26 antisocialApp.ioActivity = IO(expressListener, {
27 'path': '/antisocial-activity'
28 });
29
30 antisocialApp.ioActivity.on('connect', function (soc) {
31 debug('%s /antisocial-activity connect', soc.id);
32 soc.on('disconnect', function (e) {
33 debug('%s /antisocial-activity disconnect %s', soc.id, e);
34 });
35 soc.on('error', function (e) {
36 debug('%s /antisocial-activity error %s', soc.id, e);
37 });
38 });
39
40 // friend activity feed
41 // authenticate using friendAccessToken
42 // then set up model observers and backfill any news since last connected
43
44 IOAuth(antisocialApp.ioActivity, {
45 'timeout': 60000,
46 'authenticate': function (socket, data, callback) {
47 debug('%s /antisocial-activity authenticate', socket.id);
48
49 if (!data.friendAccessToken) {
50 callback(new VError('friendAccessToken not supplied'), false);
51 }
52 if (!data.username) {
53 callback(new VError('username not supplied'), false);
54 }
55
56 async.waterfall([
57 function getUser(cb) {
58 db.getInstances('users', [{
59 'property': 'username',
60 'value': data.username
61 }], function (err, userInstances) {
62 if (err) {
63 return cb(new VError(err, 'user not found'));
64 }
65
66 if (userInstances.length > 1) {
67 return cb(new VError('more than one user matching username'));
68 }
69
70 if (!userInstances.length) {
71 return cb(new VError('user not found'));
72 }
73
74 var user = userInstances[0];
75
76 cb(err, user);
77 });
78 },
79 function findFriend(user, cb) {
80 db.getInstances('friends', [{
81 'property': 'userId',
82 'value': user.id
83 }, {
84 'property': 'localAccessToken',
85 'value': data.friendAccessToken
86 }], function (err, friendInstances) {
87 if (err) {
88 return cb(new VError(err, 'error reading friends'));
89 }
90
91 if (!friendInstances.length) {
92 return cb(new VError(err, 'friend not found'));
93 }
94
95 cb(err, user, friendInstances[0]);
96 });
97 }
98 ], function (err, user, friend) {
99 if (err) {
100 debug('%s /antisocial-activity authenticate error %s', socket.id, err.message);
101 return callback(err);
102 }
103 if (friend.status !== 'accepted') {
104 return callback(new VError(err, 'friend not accepted'), false);
105 }
106 data.friend = friend;
107 data.user = user;
108
109 var key = data.user.username + '<-' + data.friend.remoteEndPoint;
110
111 if (antisocialApp.openActivityListeners[key]) {
112 debug('%s /antisocial-activity abort already connected %s', socket.id, key);
113 return callback(new VError(err, 'already connected ' + key), false);
114 }
115
116 antisocialApp.openActivityListeners[key] = socket;
117
118 callback(null, true);
119 });
120 },
121 'postAuthenticate': function (socket, data) {
122 debug('%s /antisocial-activity postAuthenticate', socket.id);
123
124 socket.antisocial = {
125 'friend': data.friend,
126 'user': data.user,
127 'key': data.user.username + '<-' + data.friend.remoteEndPoint
128 };
129
130 debug('%s /antisocial-activity connection established %s', socket.id, socket.antisocial.key);
131
132 socket.antisocial.emitter = function (appId, eventType, data) {
133 refresh(antisocialApp, socket, function (err) {
134 if (err) {
135 console.log('emitter socket data refresh error', err.message, err.cause().message);
136 return;
137 }
138 var message = {
139 'appId': appId,
140 'data': data
141 };
142
143 debug('emitter (feed mount)', eventType, message);
144 message = cryptography.encrypt(socket.antisocial.friend.remotePublicKey, socket.antisocial.friend.keys.private, JSON.stringify(message));
145 socket.emit(eventType, message);
146 });
147 };
148
149 socket.on('highwater', function (data) {
150 refresh(antisocialApp, socket, function (err) {
151 if (err) {
152 console.log('highwater event socket data refresh error', err.message, err.cause().message);
153 return;
154 }
155
156 var decrypted = cryptography.decrypt(socket.antisocial.friend.remotePublicKey, socket.antisocial.friend.keys.private, data);
157 if (!decrypted.valid) { // could not validate signature
158 console.log('WatchNewsFeedItem decryption signature validation error:', decrypted.invalidReason);
159 return;
160 }
161
162 data = decrypted.data;
163 debug('%s /antisocial-activity got highwater from %s %s', socket.id, socket.antisocial.key, data);
164
165 if (!decrypted.contentType || decrypted.contentType === 'application/json') {
166 try {
167 data = JSON.parse(decrypted.data);
168 }
169 catch (e) {
170 data = decrypted.data;
171 }
172 }
173
174 var appid = data.appId;
175 antisocialApp.emit('activity-backfill-' + appid, socket.antisocial.user, socket.antisocial.friend, data.data, socket.antisocial.emitter);
176 });
177 });
178
179 socket.on('data', function (data) {
180 refresh(antisocialApp, socket, function (err) {
181 if (err) {
182 console.log('data event socket data refresh error', err.message, err.cause().message);
183 return;
184 }
185
186 debug('%s /antisocial-activity data from %s', socket.id, socket.antisocial.key);
187
188 var decrypted = cryptography.decrypt(socket.antisocial.friend.remotePublicKey, socket.antisocial.friend.keys.private, data);
189 if (!decrypted.valid) { // could not validate signature
190 debug('%s /antisocial-activity decryption signature validation error:', socket.id, decrypted.invalidReason);
191 return;
192 }
193
194 data = decrypted.data;
195
196 if (!decrypted.contentType || decrypted.contentType === 'application/json') {
197 try {
198 data = JSON.parse(decrypted.data);
199 }
200 catch (e) {
201 data = '';
202 }
203 }
204
205 var appid = data.appId;
206 debug('%s /antisocial-activity emitting activity-data-' + appid, socket.id);
207 antisocialApp.emit('activity-data-' + appid, socket.antisocial.user, socket.antisocial.friend, data.data);
208 });
209 });
210
211 socket.on('disconnect', function (reason) {
212 refresh(antisocialApp, socket, function (err) {
213 if (err) {
214 console.log('disconnect event socket data refresh error', err.message, err.cause().message);
215 return;
216 }
217
218 debug('%s /antisocial-activity disconnect %s %s', socket.id, socket.antisocial.key, reason);
219 antisocialApp.emit('close-activity-connection', socket.antisocial.user, socket.antisocial.friend, reason, socket.antisocial);
220 db.updateInstance('friends', socket.antisocial.friend.id, {
221 'online': false
222 });
223 delete antisocialApp.openActivityListeners[socket.antisocial.key];
224 });
225 });
226
227 antisocialApp.emit('open-activity-connection', socket.antisocial.user, socket.antisocial.friend, socket.antisocial.emitter, socket.antisocial);
228
229 db.updateInstance('friends', socket.antisocial.friend.id, {
230 'online': true
231 });
232 }
233 });
234};