1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 |
|
8 |
|
9 | var debug = require('debug')('antisocial-friends-feeds');
|
10 | var url = require('url');
|
11 | var IOClient = require('socket.io-client');
|
12 | var cryptography = require('antisocial-encryption');
|
13 | var moment = require('moment');
|
14 |
|
15 | module.exports.connect = function activityFeedSubscribeConnect(antisocialApp, currentUser, friend) {
|
16 | var config = antisocialApp.config;
|
17 | var db = antisocialApp.db;
|
18 | var authUserMiddleware = antisocialApp.authUserMiddleware;
|
19 |
|
20 | if (!antisocialApp.openActivityListeners) {
|
21 | antisocialApp.openActivityListeners = {};
|
22 | }
|
23 |
|
24 | var key = currentUser.username + '<-' + friend.remoteEndPoint;
|
25 |
|
26 | if (antisocialApp.openActivityListeners[key]) {
|
27 | debug('activityFeedSubscribeConnect abort already connected %s', key);
|
28 | return;
|
29 | }
|
30 |
|
31 | var remoteEndPoint = url.parse(friend.remoteEndPoint);
|
32 | var endpoint = remoteEndPoint.protocol === 'https:' ? 'wss' : 'ws';
|
33 | endpoint += '://' + remoteEndPoint.host;
|
34 |
|
35 | debug('attempting to subscribe to ' + endpoint + '/antisocial-activity');
|
36 |
|
37 |
|
38 | var socket = IOClient(endpoint, {
|
39 | 'path': '/antisocial-activity'
|
40 | });
|
41 |
|
42 | socket.on('reconnecting', function (attempt) {
|
43 | debug('attempting reconnect %s %s', endpoint, attempt);
|
44 | });
|
45 |
|
46 | socket.on('reconnect_error', function () {
|
47 | debug('reconnect attempt failed %s %s', endpoint);
|
48 | });
|
49 |
|
50 |
|
51 | socket.on('connect', function () {
|
52 | debug('client connected');
|
53 |
|
54 | socket.on('unauthorized', function (err) {
|
55 | debug('client unauthorized', err.message);
|
56 | });
|
57 |
|
58 | socket.on('error', function () {
|
59 | debug('client error');
|
60 | });
|
61 |
|
62 | socket.on('unauthorized', function (err) {
|
63 | debug('client unauthorized', err.message);
|
64 | });
|
65 |
|
66 | socket.on('authenticated', function () {
|
67 | debug('client authenticated');
|
68 |
|
69 | socket.antisocial = {
|
70 | 'key': key,
|
71 | 'friend': friend,
|
72 | 'user': currentUser,
|
73 | 'setDataHandler': function setDataHandler(handler) {
|
74 | socket.antisocial.dataHandler = handler;
|
75 | }
|
76 | };
|
77 |
|
78 | antisocialApp.openActivityListeners[socket.antisocial.key] = socket;
|
79 |
|
80 | antisocialApp.emit('open-activity-connection', {
|
81 | 'info': socket.antisocial,
|
82 | 'socket': socket
|
83 | });
|
84 |
|
85 | socket.on('highwater', function (highwater) {
|
86 | debug('got highwater from %s %j', socket.antisocial.friend.id, highwater);
|
87 | antisocialApp.emit('activity-backfill', {
|
88 | 'info': socket.antisocial,
|
89 | 'socket': socket,
|
90 | 'highwater': highwater
|
91 | });
|
92 | });
|
93 |
|
94 | socket.on('data', function (data) {
|
95 | var decrypted = cryptography.decrypt(socket.antisocial.friend.remotePublicKey, socket.antisocial.friend.keys.private, data);
|
96 | if (!decrypted.valid) {
|
97 | console.log('WatchNewsFeedItem decryption signature validation error:', decrypted.invalidReason);
|
98 | return;
|
99 | }
|
100 |
|
101 | try {
|
102 | data = JSON.parse(decrypted.data);
|
103 | }
|
104 | catch (e) {
|
105 | data = decrypted.data;
|
106 | }
|
107 |
|
108 | if (socket.antisocial.dataHandler) {
|
109 | socket.antisocial.dataHandler(data);
|
110 | }
|
111 | });
|
112 |
|
113 | socket.on('disconnect', function (reason) {
|
114 | antisocialApp.emit('close-activity-connection', {
|
115 | 'info': socket.antisocial,
|
116 | 'reason': reason
|
117 | });
|
118 |
|
119 | db.updateInstance('friends', socket.antisocial.friend.id, {
|
120 | 'online': false
|
121 | });
|
122 |
|
123 | delete antisocialApp.openActivityListeners[socket.antisocial.key];
|
124 | });
|
125 |
|
126 | socket.emit('highwater', socket.antisocial.friend.highWater ? socket.antisocial.friend.highWater : moment().subtract(7, 'd').toISOString());
|
127 |
|
128 | db.updateInstance('friends', socket.antisocial.friend.id, {
|
129 | 'online': true
|
130 | });
|
131 |
|
132 | });
|
133 |
|
134 |
|
135 | debug('authenticating');
|
136 | socket.emit('authentication', {
|
137 | 'username': friend.remoteUsername,
|
138 | 'friendAccessToken': friend.remoteAccessToken,
|
139 | 'friendHighWater': friend.highWater
|
140 | });
|
141 | });
|
142 | };
|
143 |
|
144 | module.exports.disconnect = function activityFeedSubscribeDisconnect(antisocialApp, config, dbAdaptor, currentUser, friend) {
|
145 | for (var key in antisocialApp.openActivityListeners) {
|
146 | var socket = antisocialApp.openActivityListeners[key];
|
147 | if (socket.data.friend.id.toString() === friend.id.toString()) {
|
148 | debug('disconnect %s', socket.data.connectionKey);
|
149 | socket.disconnect(true);
|
150 | delete antisocialApp.openActivityListeners[key];
|
151 | }
|
152 | }
|
153 | };
|