1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 | var debug = require('debug')('antisocial-friends:activity');
|
10 | var url = require('url');
|
11 | var IOClient = require('socket.io-client');
|
12 | var cryptography = require('antisocial-encryption');
|
13 | var refresh = require('./utilities').refresh;
|
14 |
|
15 | module.exports = function (antisocialApp) {
|
16 |
|
17 | function activityFeedSubscribeConnect(currentUser, friend) {
|
18 | var db = antisocialApp.db;
|
19 |
|
20 | if (!antisocialApp.openActivityListeners) {
|
21 | antisocialApp.openActivityListeners = {};
|
22 | }
|
23 |
|
24 | var key = currentUser.username + '<-' + friend.remoteEndPoint;
|
25 |
|
26 | if (antisocialApp.openActivityListeners[key]) {
|
27 | debug('subscribe aborted, already connected %s', key);
|
28 | return;
|
29 | }
|
30 |
|
31 | var remoteEndPoint = url.parse(friend.remoteEndPoint);
|
32 | var endpoint = remoteEndPoint.protocol === 'https:' ? 'wss' : 'ws';
|
33 | endpoint += '://' + remoteEndPoint.host;
|
34 |
|
35 | debug('subscribe to ' + endpoint + '/antisocial-activity key:' + key);
|
36 |
|
37 |
|
38 | var socket = IOClient(endpoint, {
|
39 | 'path': '/antisocial-activity',
|
40 | 'reconnection': false
|
41 | });
|
42 |
|
43 |
|
44 | socket.on('connect', function () {
|
45 | debug('%s subscribe client connected', socket.id);
|
46 |
|
47 | socket.on('unauthorized', function (err) {
|
48 | debug('%s subscribe client unauthorized', socket.id, err.message);
|
49 | });
|
50 |
|
51 | socket.on('error', function () {
|
52 | debug('%s subscribe client error', socket.id);
|
53 | });
|
54 |
|
55 | socket.on('authenticated', function () {
|
56 | debug('%s subscribe client authenticated', socket.id);
|
57 |
|
58 | socket.antisocial = {
|
59 | 'key': key,
|
60 | 'friend': friend,
|
61 | 'user': currentUser
|
62 | };
|
63 |
|
64 | antisocialApp.openActivityListeners[socket.antisocial.key] = socket;
|
65 |
|
66 | socket.antisocial.emitter = function (appId, eventType, data) {
|
67 | refresh(antisocialApp, socket, function (err) {
|
68 | if (err) {
|
69 | console.log('emitter socket data refresh error', err.message, err.cause().message);
|
70 | return;
|
71 | }
|
72 |
|
73 | var message = {
|
74 | 'appId': appId,
|
75 | 'data': data
|
76 | };
|
77 | debug('emitter (feed-subscribe)', eventType, message);
|
78 | message = cryptography.encrypt(socket.antisocial.friend.remotePublicKey, socket.antisocial.friend.keys.private, JSON.stringify(message));
|
79 | socket.emit(eventType, message);
|
80 | });
|
81 | };
|
82 |
|
83 | socket.on('highwater', function (data) {
|
84 | refresh(antisocialApp, socket, function (err) {
|
85 | if (err) {
|
86 | console.log('highwater event socket data refresh error', err.message, err.cause().message);
|
87 | return;
|
88 | }
|
89 |
|
90 | var decrypted = cryptography.decrypt(socket.antisocial.friend.remotePublicKey, socket.antisocial.friend.keys.private, data);
|
91 | if (!decrypted.valid) {
|
92 | console.log('WatchNewsFeedItem decryption signature validation error:', decrypted.invalidReason);
|
93 | return;
|
94 | }
|
95 |
|
96 | data = decrypted.data;
|
97 | debug('%s subscribe got highwater from %s %s', socket.id, socket.antisocial.key, data);
|
98 |
|
99 | if (!decrypted.contentType || decrypted.contentType === 'application/json') {
|
100 | try {
|
101 | data = JSON.parse(decrypted.data);
|
102 | }
|
103 | catch (e) {
|
104 | data = decrypted.data;
|
105 | }
|
106 | }
|
107 |
|
108 | var appid = data.appId;
|
109 | antisocialApp.emit('activity-backfill-' + appid, socket.antisocial.user, socket.antisocial.friend, data.data, socket.antisocial.emitter);
|
110 | });
|
111 | });
|
112 |
|
113 | socket.on('data', function (data) {
|
114 | refresh(antisocialApp, socket, function (err) {
|
115 | if (err) {
|
116 | console.log('data event socket data refresh error', err.message, err.cause().message);
|
117 | return;
|
118 | }
|
119 |
|
120 | debug('%s /antisocial-activity data from %s', socket.id, socket.antisocial.key);
|
121 |
|
122 | var decrypted = cryptography.decrypt(socket.antisocial.friend.remotePublicKey, socket.antisocial.friend.keys.private, data);
|
123 | if (!decrypted.valid) {
|
124 | console.log('WatchNewsFeedItem decryption signature validation error:', decrypted.invalidReason);
|
125 | return;
|
126 | }
|
127 |
|
128 | data = decrypted.data;
|
129 |
|
130 | if (!decrypted.contentType || decrypted.contentType === 'application/json') {
|
131 | try {
|
132 | data = JSON.parse(decrypted.data);
|
133 | }
|
134 | catch (e) {
|
135 | data = decrypted.data;
|
136 | }
|
137 | }
|
138 |
|
139 | var appid = data.appId;
|
140 | debug('%s emitting activity-data-' + appid, socket.id);
|
141 | antisocialApp.emit('activity-data-' + appid, socket.antisocial.user, socket.antisocial.friend, data.data);
|
142 | });
|
143 | });
|
144 |
|
145 | socket.on('disconnect', function (reason) {
|
146 | refresh(antisocialApp, socket, function (err) {
|
147 | if (err) {
|
148 | console.log('disconnect event socket data refresh error', err.message, err.cause().message);
|
149 | return;
|
150 | }
|
151 |
|
152 | antisocialApp.emit('close-activity-connection', socket.antisocial.user, socket.antisocial.friend, reason, socket.antisocial);
|
153 |
|
154 | db.updateInstance('friends', socket.antisocial.friend.id, {
|
155 | 'online': false
|
156 | });
|
157 |
|
158 | delete antisocialApp.openActivityListeners[socket.antisocial.key];
|
159 | });
|
160 | });
|
161 |
|
162 | antisocialApp.emit('open-activity-connection', socket.antisocial.user, socket.antisocial.friend, socket.antisocial.emitter, socket.antisocial);
|
163 |
|
164 | db.updateInstance('friends', socket.antisocial.friend.id, {
|
165 | 'online': true
|
166 | });
|
167 | });
|
168 |
|
169 |
|
170 | debug('%s subscribe authenticating', socket.id);
|
171 | socket.emit('authentication', {
|
172 | 'username': friend.remoteUsername,
|
173 | 'friendAccessToken': friend.remoteAccessToken
|
174 | });
|
175 | });
|
176 | }
|
177 |
|
178 | function activityFeedSubscribeDisconnect(currentUser, friend, cb) {
|
179 | for (var key in antisocialApp.openActivityListeners) {
|
180 | var socket = antisocialApp.openActivityListeners[key];
|
181 | if (socket.antisocial.friend.id.toString() === friend.id.toString()) {
|
182 | debug('%s subscribe disconnect %s', socket.id, socket.antisocial.key);
|
183 | socket.disconnect(true);
|
184 | delete antisocialApp.openActivityListeners[key];
|
185 | return cb(null);
|
186 | }
|
187 | }
|
188 | return cb(new Error('activityFeedSubscribeDisconnect not connected'));
|
189 | }
|
190 |
|
191 | return {
|
192 | 'connect': activityFeedSubscribeConnect,
|
193 | 'disconnect': activityFeedSubscribeDisconnect
|
194 | };
|
195 |
|
196 | };
|