UNPKG

5.04 kBJavaScriptView Raw
1import createDebug from 'debug';
2import WaitList from 'promise-waitlist';
3
4import Message from './protocol/message';
5import { Packet as PPacket } from './messages';
6
7const debug = createDebug('vowlink:sync-agent');
8
9const DEFAULT_TIMEOUT = 15 * 1000; // 15 seconds
10
11export default class SyncAgent {
12 constructor(options = {}) {
13 this.state = 'idle';
14 this.options = {
15 timeout: DEFAULT_TIMEOUT,
16 ...options,
17 };
18
19 this.sodium = this.options.sodium;
20 this.channel = this.options.channel;
21 this.socket = this.options.socket;
22
23 if (!this.sodium) {
24 throw new Error('Missing required `sodium` option');
25 }
26 if (!this.channel) {
27 throw new Error('Missing required `channel` option');
28 }
29 if (!this.socket) {
30 throw new Error('Missing required `socket` option');
31 }
32
33 this.timeoutWaitList = new WaitList();
34
35 this.seq = 0;
36 this.resolveResponse = {
37 // seq => Function
38 query: new Map(),
39 // seq => Function
40 bulk: new Map(),
41 };
42 }
43
44 destroy() {
45 this.timeoutWaitList.close(new Error('SyncAgent destroyed'));
46 }
47
48 async synchronize() {
49 this.debug('synchronize() state=%s', this.state);
50 if (this.state === 'idle') {
51 this.state = 'active';
52 } else if (this.state === 'active') {
53 this.state = 'pending';
54 } else {
55 // Already pending
56 return;
57 }
58
59 this.debug('synchronize() starting sync');
60 await this.channel.sync(this);
61 this.debug('synchronize() starting sync complete');
62
63 const isPending = this.state === 'pending';
64 this.state = 'idle';
65
66 // Repeat synchronization if it was pending
67 if (isPending) {
68 this.debug('synchronize() restarting sync');
69 return await this.synchronize();
70 }
71 }
72
73 async receiveQuery(query) {
74 this.debug('receiveQuery() seq=%d', query.seq);
75 const cursor = query.cursor === 'hash' ? { hash: query.hash } :
76 { height: query.height };
77
78 const result = await this.channel.query(
79 cursor, query.isBackward, query.limit);
80
81 const queryResponse = {
82 channelId: query.channelId,
83 seq: query.seq,
84 ...result,
85 };
86
87 await this.socket.send(PPacket.encode({
88 queryResponse,
89 }).finish());
90 }
91
92 async receiveBulk(bulk) {
93 this.debug('receiveBulk() seq=%d', bulk.seq);
94 const result = await this.channel.bulk(bulk.hashes);
95
96 const bulkResponse = {
97 channelId: bulk.channelId,
98 seq: bulk.seq,
99 ...result,
100 };
101
102 await this.socket.send(PPacket.encode({
103 bulkResponse,
104 }).finish());
105 }
106
107 async receiveQueryResponse(response) {
108 this.debug('receiveQueryResponse() seq=%d', response.seq);
109 const resolve = this.resolveResponse.query.get(response.seq);
110 if (!resolve) {
111 throw new Error('Unexpected QueryResponse');
112 }
113
114 resolve(response);
115 }
116
117 async receiveBulkResponse(response) {
118 this.debug('receiveBulkResponse() seq=%d', response.seq);
119 const resolve = this.resolveResponse.bulk.get(response.seq);
120 if (!resolve) {
121 throw new Error('Unexpected BulkResponse');
122 }
123
124 resolve(response);
125 }
126
127 //
128 // Synchronization methods for Channel remote
129 //
130
131 async query(cursor, isBackward, limit) {
132 const seq = this.getNextSeq();
133 const packet = Object.assign({
134 channelId: this.channel.id,
135 seq,
136 isBackward,
137 limit,
138 }, cursor.hash ? { hash: cursor.hash } : { height: cursor.height });
139
140 const response = await this.sendAndWait('query', seq, { query: packet });
141
142 return {
143 abbreviatedMessages: response.abbreviatedMessages,
144 forwardHash: response.forwardHash.length === 0 ? null :
145 response.forwardHash,
146 backwardHash: response.backwardHash.length === 0 ? null :
147 response.backwardHash,
148 };
149 }
150
151 async bulk(hashes) {
152 const seq = this.getNextSeq();
153 const packet = {
154 channelId: this.channel.id,
155 seq,
156 hashes,
157 };
158
159 const response = await this.sendAndWait('bulk', seq, { bulk: packet });
160
161 return {
162 messages: response.messages.map((decoded) => {
163 return Message.deserialize(decoded, { sodium: this.sodium });
164 }),
165 forwardIndex: response.forwardIndex,
166 };
167 }
168
169 //
170 // Utils
171 //
172
173 async sendAndWait(type, seq, packet) {
174 const queryResponse = new Promise((resolve) => {
175 this.resolveResponse[type].set(seq, resolve);
176 });
177
178 this.debug('sendAndWait %s seq=%d waiting=%d',
179 type,
180 seq,
181 this.resolveResponse[type].size);
182
183 await this.socket.send(PPacket.encode(packet).finish());
184
185 const entry = this.timeoutWaitList.waitFor(null, this.options.timeout);
186
187 try {
188 return await Promise.race([
189 queryResponse,
190 entry.promise,
191 ]);
192 } finally {
193 this.resolveResponse[type].delete(seq);
194 entry.cancel();
195 }
196 }
197
198 getNextSeq() {
199 const result = this.seq;
200 this.seq = (this.seq + 1) >>> 0;
201 return result;
202 }
203
204 debug(fmt, ...args) {
205 debug('channel.id=%s ' + fmt, ...[ this.channel.debugId ].concat(args));
206 }
207}