UNPKG

5.73 kBJavaScriptView Raw
1// Copyright Michael Rhodes. 2017,2018. All Rights Reserved.
2// This file is licensed under the MIT License.
3// License text available at https://opensource.org/licenses/MIT
4
5/*
6 mount socket.io listener for incoming activity connections (server to server friends)
7*/
8
9var debug = require('debug')('antisocial-friends:activity');
10var VError = require('verror').VError;
11var async = require('async');
12var IO = require('socket.io');
13var IOAuth = require('socketio-auth');
14var cryptography = require('antisocial-encryption');
15var moment = require('moment');
16
17module.exports = function activityFeedMount(antisocialApp, expressListener) {
18 var config = antisocialApp.config;
19 var db = antisocialApp.db;
20 var authUserMiddleware = antisocialApp.authUserMiddleware;
21
22 debug('mounting ws /antisocial-activity');
23
24 if (!antisocialApp.openActivityListeners) {
25 antisocialApp.openActivityListeners = {};
26 }
27
28 antisocialApp.ioActivity = IO(expressListener, {
29 'path': '/antisocial-activity'
30 });
31
32 antisocialApp.ioActivity.on('connect', function (soc) {
33 debug('got connect', soc.id);
34 soc.on('disconnect', function (e) {
35 debug('got disconnect %s %s', soc.id, e);
36 });
37 soc.on('error', function (e) {
38 debug('got error %s %s', soc.id, e);
39 });
40 });
41
42 // friend activity feed
43 // authenticate using friendAccessToken
44 // then set up model observers and backfill any news since last connected
45
46 IOAuth(antisocialApp.ioActivity, {
47 'timeout': 60000,
48 'authenticate': function (socket, data, callback) {
49 debug('authenticate %s', socket.id);
50
51 if (!data.friendAccessToken) {
52 callback(new VError('friendAccessToken not supplied'), false);
53 }
54 if (!data.username) {
55 callback(new VError('username not supplied'), false);
56 }
57
58 async.waterfall([
59 function getUser(cb) {
60 db.getInstances('users', [{
61 'property': 'username',
62 'value': data.username
63 }], function (err, userInstances) {
64 if (err) {
65 return cb(new VError(err, 'user not found'));
66 }
67
68 if (userInstances.length > 1) {
69 return cb(new VError('more than one user matching username'));
70 }
71
72 if (!userInstances.length) {
73 return cb(new VError('user not found'));
74 }
75
76 var user = userInstances[0];
77
78 cb(err, user);
79 });
80 },
81 function findFriend(user, cb) {
82 db.getInstances('friends', [{
83 'property': 'userId',
84 'value': user.id
85 }, {
86 'property': 'localAccessToken',
87 'value': data.friendAccessToken
88 }], function (err, friendInstances) {
89 if (err) {
90 return cb(new VError(err, 'error reading friends'));
91 }
92
93 if (!friendInstances.length) {
94 return cb(new VError(err, 'friend not found'));
95 }
96
97 cb(err, user, friendInstances[0]);
98 });
99 }
100 ], function (err, user, friend) {
101 if (err) {
102 debug('authenticate error %s', err.message);
103 return callback(err);
104 }
105 if (friend.status !== 'accepted') {
106 return callback(new VError(err, 'friend not accepted'), false);
107 }
108 data.friend = friend;
109 data.user = user;
110
111 var key = data.user.username + '<-' + data.friend.remoteEndPoin;
112
113 if (antisocialApp.openActivityListeners[key]) {
114 debug('activityFeedSubscribeConnect abort already connected %s', key);
115 return callback(new VError(err, 'already connected ' + key), false);
116 }
117
118 callback(null, true);
119 });
120 },
121 'postAuthenticate': function (socket, data) {
122 debug('postAuthenticate %s', socket.id);
123
124 socket.antisocial = {
125 'friend': data.friend,
126 'user': data.user,
127 'key': data.user.username + '<-' + data.friend.remoteEndPoint,
128 'setDataHandler': function setDataHandler(handler) {
129 socket.antisocial.dataHandler = handler;
130 }
131 };
132
133 debug('connection established %s %s', socket.id, socket.antisocial.key);
134
135 antisocialApp.openActivityListeners[socket.antisocial.key] = socket;
136
137 antisocialApp.emit('open-activity-connection', {
138 'info': socket.antisocial,
139 'socket': socket
140 });
141
142 socket.on('highwater', function (highwater) {
143 debug('got highwater from %s %s', socket.id, socket.antisocial.key, highwater);
144 antisocialApp.emit('activity-backfill', {
145 'info': socket.antisocial,
146 'socket': socket,
147 'highwater': highwater
148 });
149 });
150
151 socket.on('data', function (data) {
152 debug('got data from %s %s', socket.id, socket.antisocial.key);
153
154 var decrypted = cryptography.decrypt(socket.antisocial.friend.remotePublicKey, socket.antisocial.friend.keys.private, data);
155 if (!decrypted.valid) { // could not validate signature
156 debug('decryption signature validation error:', decrypted.invalidReason);
157 return;
158 }
159
160 data = decrypted.data;
161
162 if (!decrypted.contentType || decrypted.contentType === 'application/json') {
163 try {
164 data = JSON.parse(decrypted.data);
165 }
166 catch (e) {
167 data = '';
168 }
169 }
170
171 if (socket.antisocial.dataHandler) {
172 socket.antisocial.dataHandler(data);
173 }
174 else {
175 debug('no data handler for %s', socket.antisocial.key);
176 }
177 });
178
179 socket.on('disconnect', function (reason) {
180 debug('got disconnect %s %s %s', socket.id, socket.antisocial.key, reason);
181 antisocialApp.emit('close-activity-connection', {
182 'info': socket.antisocial,
183 'reason': reason
184 });
185 db.updateInstance('friends', socket.antisocial.friend.id, {
186 'online': false
187 });
188 delete antisocialApp.openActivityListeners[socket.antisocial.key];
189 });
190
191 socket.emit('highwater', socket.antisocial.friend.highWater ? socket.antisocial.friend.highWater : moment().subtract(7, 'd').toISOString());
192
193 db.updateInstance('friends', socket.antisocial.friend.id, {
194 'online': true
195 });
196 }
197 });
198};