UNPKG

6.62 kBJavaScriptView Raw
1// Copyright Michael Rhodes. 2017,2018. All Rights Reserved.
2// This file is licensed under the MIT License.
3// License text available at https://opensource.org/licenses/MIT
4
5/*
6 mount socket.io listener for incoming activity connections (server to server friends)
7*/
8
9var debug = require('debug')('antisocial-friends:activity');
10var VError = require('verror').VError;
11var async = require('async');
12var IO = require('socket.io');
13var IOAuth = require('socketio-auth');
14var cryptography = require('antisocial-encryption');
15
16module.exports = function activityFeedMount(antisocialApp, expressListener) {
17 var db = antisocialApp.db;
18
19 debug('mounting ws /antisocial-activity');
20
21 if (!antisocialApp.openActivityListeners) {
22 antisocialApp.openActivityListeners = {};
23 }
24
25 antisocialApp.ioActivity = IO(expressListener, {
26 'path': '/antisocial-activity'
27 });
28
29 antisocialApp.ioActivity.on('connect', function (soc) {
30 debug('%s /antisocial-activity connect', soc.id);
31 soc.on('disconnect', function (e) {
32 debug('%s /antisocial-activity disconnect %s', soc.id, e);
33 });
34 soc.on('error', function (e) {
35 debug('%s /antisocial-activity error %s', soc.id, e);
36 });
37 });
38
39 // friend activity feed
40 // authenticate using friendAccessToken
41 // then set up model observers and backfill any news since last connected
42
43 IOAuth(antisocialApp.ioActivity, {
44 'timeout': 60000,
45 'authenticate': function (socket, data, callback) {
46 debug('%s /antisocial-activity authenticate', socket.id);
47
48 if (!data.friendAccessToken) {
49 callback(new VError('friendAccessToken not supplied'), false);
50 }
51 if (!data.username) {
52 callback(new VError('username not supplied'), false);
53 }
54
55 async.waterfall([
56 function getUser(cb) {
57 db.getInstances('users', [{
58 'property': 'username',
59 'value': data.username
60 }], function (err, userInstances) {
61 if (err) {
62 return cb(new VError(err, 'user not found'));
63 }
64
65 if (userInstances.length > 1) {
66 return cb(new VError('more than one user matching username'));
67 }
68
69 if (!userInstances.length) {
70 return cb(new VError('user not found'));
71 }
72
73 var user = userInstances[0];
74
75 cb(err, user);
76 });
77 },
78 function findFriend(user, cb) {
79 db.getInstances('friends', [{
80 'property': 'userId',
81 'value': user.id
82 }, {
83 'property': 'localAccessToken',
84 'value': data.friendAccessToken
85 }], function (err, friendInstances) {
86 if (err) {
87 return cb(new VError(err, 'error reading friends'));
88 }
89
90 if (!friendInstances.length) {
91 return cb(new VError(err, 'friend not found'));
92 }
93
94 cb(err, user, friendInstances[0]);
95 });
96 }
97 ], function (err, user, friend) {
98 if (err) {
99 debug('%s /antisocial-activity authenticate error %s', socket.id, err.message);
100 return callback(err);
101 }
102 if (friend.status !== 'accepted') {
103 return callback(new VError(err, 'friend not accepted'), false);
104 }
105 data.friend = friend;
106 data.user = user;
107
108 var key = data.user.username + '<-' + data.friend.remoteEndPoint;
109
110 if (antisocialApp.openActivityListeners[key]) {
111 debug('%s /antisocial-activity abort already connected %s', socket.id, key);
112 return callback(new VError(err, 'already connected ' + key), false);
113 }
114
115 antisocialApp.openActivityListeners[key] = socket;
116
117 callback(null, true);
118 });
119 },
120 'postAuthenticate': function (socket, data) {
121 debug('%s /antisocial-activity postAuthenticate', socket.id);
122
123 socket.antisocial = {
124 'friend': data.friend,
125 'user': data.user,
126 'key': data.user.username + '<-' + data.friend.remoteEndPoint
127 };
128
129 debug('%s /antisocial-activity connection established %s', socket.id, socket.antisocial.key);
130
131 socket.antisocial.emitter = function (appId, eventType, data) {
132 var message = {
133 'appId': appId,
134 'data': data
135 };
136
137 debug('emitter (feed mount)', eventType, message);
138 message = cryptography.encrypt(socket.antisocial.friend.remotePublicKey, socket.antisocial.friend.keys.private, JSON.stringify(message));
139 socket.emit(eventType, message);
140 };
141
142 socket.on('highwater', function (data) {
143 var decrypted = cryptography.decrypt(socket.antisocial.friend.remotePublicKey, socket.antisocial.friend.keys.private, data);
144 if (!decrypted.valid) { // could not validate signature
145 console.log('WatchNewsFeedItem decryption signature validation error:', decrypted.invalidReason);
146 return;
147 }
148
149 data = decrypted.data;
150 debug('%s /antisocial-activity got highwater from %s %s', socket.id, socket.antisocial.key, data);
151
152 if (!decrypted.contentType || decrypted.contentType === 'application/json') {
153 try {
154 data = JSON.parse(decrypted.data);
155 }
156 catch (e) {
157 data = decrypted.data;
158 }
159 }
160
161 var appid = data.appId;
162 antisocialApp.emit('activity-backfill-' + appid, socket.antisocial.user, socket.antisocial.friend, data.data, socket.antisocial.emitter);
163 });
164
165 socket.on('data', function (data) {
166 debug('%s /antisocial-activity data from %s', socket.id, socket.antisocial.key);
167
168 var decrypted = cryptography.decrypt(socket.antisocial.friend.remotePublicKey, socket.antisocial.friend.keys.private, data);
169 if (!decrypted.valid) { // could not validate signature
170 debug('%s /antisocial-activity decryption signature validation error:', socket.id, decrypted.invalidReason);
171 return;
172 }
173
174 data = decrypted.data;
175
176 if (!decrypted.contentType || decrypted.contentType === 'application/json') {
177 try {
178 data = JSON.parse(decrypted.data);
179 }
180 catch (e) {
181 data = '';
182 }
183 }
184
185 var appid = data.appId;
186 debug('%s /antisocial-activity emitting activity-data-' + appid, socket.id);
187 antisocialApp.emit('activity-data-' + appid, socket.antisocial.user, socket.antisocial.friend, data.data);
188 });
189
190 socket.on('disconnect', function (reason) {
191 debug('%s /antisocial-activity disconnect %s %s', socket.id, socket.antisocial.key, reason);
192 antisocialApp.emit('close-activity-connection', socket.antisocial.user, socket.antisocial.friend, reason, socket.antisocial);
193 db.updateInstance('friends', socket.antisocial.friend.id, {
194 'online': false
195 });
196 delete antisocialApp.openActivityListeners[socket.antisocial.key];
197 });
198
199 antisocialApp.emit('open-activity-connection', socket.antisocial.user, socket.antisocial.friend, socket.antisocial.emitter, socket.antisocial);
200
201 db.updateInstance('friends', socket.antisocial.friend.id, {
202 'online': true
203 });
204 }
205 });
206};