1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 | var debug = require('debug')('antisocial-friends:activity');
|
10 | var url = require('url');
|
11 | var IOClient = require('socket.io-client');
|
12 | var cryptography = require('antisocial-encryption');
|
13 |
|
14 | module.exports = function (antisocialApp) {
|
15 |
|
16 | function activityFeedSubscribeConnect(currentUser, friend) {
|
17 | var db = antisocialApp.db;
|
18 |
|
19 | if (!antisocialApp.openActivityListeners) {
|
20 | antisocialApp.openActivityListeners = {};
|
21 | }
|
22 |
|
23 | var key = currentUser.username + '<-' + friend.remoteEndPoint;
|
24 |
|
25 | if (antisocialApp.openActivityListeners[key]) {
|
26 | debug('subscribe aborted, already connected %s', key);
|
27 | return;
|
28 | }
|
29 |
|
30 | var remoteEndPoint = url.parse(friend.remoteEndPoint);
|
31 | var endpoint = remoteEndPoint.protocol === 'https:' ? 'wss' : 'ws';
|
32 | endpoint += '://' + remoteEndPoint.host;
|
33 |
|
34 | debug('subscribe to ' + endpoint + '/antisocial-activity key:' + key);
|
35 |
|
36 |
|
37 | var socket = IOClient(endpoint, {
|
38 | 'path': '/antisocial-activity',
|
39 | 'reconnection': false
|
40 | });
|
41 |
|
42 |
|
43 | socket.on('connect', function () {
|
44 | debug('%s subscribe client connected', socket.id);
|
45 |
|
46 | socket.on('unauthorized', function (err) {
|
47 | debug('%s subscribe client unauthorized', socket.id, err.message);
|
48 | });
|
49 |
|
50 | socket.on('error', function () {
|
51 | debug('%s subscribe client error', socket.id);
|
52 | });
|
53 |
|
54 | socket.on('authenticated', function () {
|
55 | debug('%s subscribe client authenticated', socket.id);
|
56 |
|
57 | socket.antisocial = {
|
58 | 'key': key,
|
59 | 'friend': friend,
|
60 | 'user': currentUser
|
61 | };
|
62 |
|
63 | antisocialApp.openActivityListeners[socket.antisocial.key] = socket;
|
64 |
|
65 | socket.antisocial.emitter = function (appId, eventType, data) {
|
66 | var message = {
|
67 | 'appId': appId,
|
68 | 'data': data
|
69 | };
|
70 | debug('emitter (feed-subscribe)', eventType, message);
|
71 | message = cryptography.encrypt(socket.antisocial.friend.remotePublicKey, socket.antisocial.friend.keys.private, JSON.stringify(message));
|
72 | socket.emit(eventType, message);
|
73 | };
|
74 |
|
75 | socket.on('highwater', function (data) {
|
76 | var decrypted = cryptography.decrypt(socket.antisocial.friend.remotePublicKey, socket.antisocial.friend.keys.private, data);
|
77 | if (!decrypted.valid) {
|
78 | console.log('WatchNewsFeedItem decryption signature validation error:', decrypted.invalidReason);
|
79 | return;
|
80 | }
|
81 |
|
82 | data = decrypted.data;
|
83 | debug('%s subscribe got highwater from %s %s', socket.id, socket.antisocial.key, data);
|
84 |
|
85 | if (!decrypted.contentType || decrypted.contentType === 'application/json') {
|
86 | try {
|
87 | data = JSON.parse(decrypted.data);
|
88 | }
|
89 | catch (e) {
|
90 | data = decrypted.data;
|
91 | }
|
92 | }
|
93 |
|
94 | var appid = data.appId;
|
95 | antisocialApp.emit('activity-backfill-' + appid, socket.antisocial.user, socket.antisocial.friend, data.data, socket.antisocial.emitter);
|
96 | });
|
97 |
|
98 | socket.on('data', function (data) {
|
99 | debug('%s /antisocial-activity data from %s', socket.id, socket.antisocial.key);
|
100 |
|
101 | var decrypted = cryptography.decrypt(socket.antisocial.friend.remotePublicKey, socket.antisocial.friend.keys.private, data);
|
102 | if (!decrypted.valid) {
|
103 | console.log('WatchNewsFeedItem decryption signature validation error:', decrypted.invalidReason);
|
104 | return;
|
105 | }
|
106 |
|
107 | data = decrypted.data;
|
108 |
|
109 | if (!decrypted.contentType || decrypted.contentType === 'application/json') {
|
110 | try {
|
111 | data = JSON.parse(decrypted.data);
|
112 | }
|
113 | catch (e) {
|
114 | data = decrypted.data;
|
115 | }
|
116 | }
|
117 |
|
118 | var appid = data.appId;
|
119 | debug('%s emitting activity-data-' + appid, socket.id);
|
120 | antisocialApp.emit('activity-data-' + appid, socket.antisocial.user, socket.antisocial.friend, data.data);
|
121 | });
|
122 |
|
123 | socket.on('disconnect', function (reason) {
|
124 | antisocialApp.emit('close-activity-connection', socket.antisocial.user, socket.antisocial.friend, reason, socket.antisocial);
|
125 |
|
126 | db.updateInstance('friends', socket.antisocial.friend.id, {
|
127 | 'online': false
|
128 | });
|
129 |
|
130 | delete antisocialApp.openActivityListeners[socket.antisocial.key];
|
131 | });
|
132 |
|
133 | antisocialApp.emit('open-activity-connection', socket.antisocial.user, socket.antisocial.friend, socket.antisocial.emitter, socket.antisocial);
|
134 |
|
135 | db.updateInstance('friends', socket.antisocial.friend.id, {
|
136 | 'online': true
|
137 | });
|
138 | });
|
139 |
|
140 |
|
141 | debug('%s subscribe authenticating', socket.id);
|
142 | socket.emit('authentication', {
|
143 | 'username': friend.remoteUsername,
|
144 | 'friendAccessToken': friend.remoteAccessToken
|
145 | });
|
146 | });
|
147 | }
|
148 |
|
149 | function activityFeedSubscribeDisconnect(currentUser, friend, cb) {
|
150 | for (var key in antisocialApp.openActivityListeners) {
|
151 | var socket = antisocialApp.openActivityListeners[key];
|
152 | if (socket.antisocial.friend.id.toString() === friend.id.toString()) {
|
153 | debug('%s subscribe disconnect %s', socket.id, socket.antisocial.key);
|
154 | socket.disconnect(true);
|
155 | delete antisocialApp.openActivityListeners[key];
|
156 | return cb(null);
|
157 | }
|
158 | }
|
159 | return cb(new Error('activityFeedSubscribeDisconnect not connected'));
|
160 | }
|
161 |
|
162 | return {
|
163 | 'connect': activityFeedSubscribeConnect,
|
164 | 'disconnect': activityFeedSubscribeDisconnect
|
165 | };
|
166 |
|
167 | };
|