1 | import createDebug from 'debug';
|
2 | import WaitList from 'promise-waitlist';
|
3 |
|
4 | import Message from './protocol/message';
|
5 | import { Packet as PPacket } from './messages';
|
6 |
|
7 | const debug = createDebug('vowlink:sync-agent');
|
8 |
|
9 | const DEFAULT_TIMEOUT = 15 * 1000;
|
10 |
|
11 | export default class SyncAgent {
|
12 | constructor(options = {}) {
|
13 | this.state = 'idle';
|
14 | this.options = {
|
15 | timeout: DEFAULT_TIMEOUT,
|
16 | ...options,
|
17 | };
|
18 |
|
19 | this.sodium = this.options.sodium;
|
20 | this.channel = this.options.channel;
|
21 | this.socket = this.options.socket;
|
22 |
|
23 | if (!this.sodium) {
|
24 | throw new Error('Missing required `sodium` option');
|
25 | }
|
26 | if (!this.channel) {
|
27 | throw new Error('Missing required `channel` option');
|
28 | }
|
29 | if (!this.socket) {
|
30 | throw new Error('Missing required `socket` option');
|
31 | }
|
32 |
|
33 | this.timeoutWaitList = new WaitList();
|
34 |
|
35 | this.seq = 0;
|
36 | this.resolveResponse = {
|
37 |
|
38 | query: new Map(),
|
39 |
|
40 | bulk: new Map(),
|
41 | };
|
42 | }
|
43 |
|
44 | destroy() {
|
45 | this.timeoutWaitList.close(new Error('SyncAgent destroyed'));
|
46 | }
|
47 |
|
48 | async synchronize() {
|
49 | this.debug('synchronize() state=%s', this.state);
|
50 | if (this.state === 'idle') {
|
51 | this.state = 'active';
|
52 | } else if (this.state === 'active') {
|
53 | this.state = 'pending';
|
54 | } else {
|
55 |
|
56 | return;
|
57 | }
|
58 |
|
59 | this.debug('synchronize() starting sync');
|
60 | await this.channel.sync(this);
|
61 | this.debug('synchronize() starting sync complete');
|
62 |
|
63 | const isPending = this.state === 'pending';
|
64 | this.state = 'idle';
|
65 |
|
66 |
|
67 | if (isPending) {
|
68 | this.debug('synchronize() restarting sync');
|
69 | return await this.synchronize();
|
70 | }
|
71 | }
|
72 |
|
73 | async receiveQuery(query) {
|
74 | this.debug('receiveQuery() seq=%d', query.seq);
|
75 | const cursor = query.cursor === 'hash' ? { hash: query.hash } :
|
76 | { height: query.height };
|
77 |
|
78 | const result = await this.channel.query(
|
79 | cursor, query.isBackward, query.limit);
|
80 |
|
81 | const queryResponse = {
|
82 | channelId: query.channelId,
|
83 | seq: query.seq,
|
84 | ...result,
|
85 | };
|
86 |
|
87 | await this.socket.send(PPacket.encode({
|
88 | queryResponse,
|
89 | }).finish());
|
90 | }
|
91 |
|
92 | async receiveBulk(bulk) {
|
93 | this.debug('receiveBulk() seq=%d', bulk.seq);
|
94 | const result = await this.channel.bulk(bulk.hashes);
|
95 |
|
96 | const bulkResponse = {
|
97 | channelId: bulk.channelId,
|
98 | seq: bulk.seq,
|
99 | ...result,
|
100 | };
|
101 |
|
102 | await this.socket.send(PPacket.encode({
|
103 | bulkResponse,
|
104 | }).finish());
|
105 | }
|
106 |
|
107 | async receiveQueryResponse(response) {
|
108 | this.debug('receiveQueryResponse() seq=%d', response.seq);
|
109 | const resolve = this.resolveResponse.query.get(response.seq);
|
110 | if (!resolve) {
|
111 | throw new Error('Unexpected QueryResponse');
|
112 | }
|
113 |
|
114 | resolve(response);
|
115 | }
|
116 |
|
117 | async receiveBulkResponse(response) {
|
118 | this.debug('receiveBulkResponse() seq=%d', response.seq);
|
119 | const resolve = this.resolveResponse.bulk.get(response.seq);
|
120 | if (!resolve) {
|
121 | throw new Error('Unexpected BulkResponse');
|
122 | }
|
123 |
|
124 | resolve(response);
|
125 | }
|
126 |
|
127 |
|
128 |
|
129 |
|
130 |
|
131 | async query(cursor, isBackward, limit) {
|
132 | const seq = this.getNextSeq();
|
133 | const packet = Object.assign({
|
134 | channelId: this.channel.id,
|
135 | seq,
|
136 | isBackward,
|
137 | limit,
|
138 | }, cursor.hash ? { hash: cursor.hash } : { height: cursor.height });
|
139 |
|
140 | const response = await this.sendAndWait('query', seq, { query: packet });
|
141 |
|
142 | return {
|
143 | abbreviatedMessages: response.abbreviatedMessages,
|
144 | forwardHash: response.forwardHash.length === 0 ? null :
|
145 | response.forwardHash,
|
146 | backwardHash: response.backwardHash.length === 0 ? null :
|
147 | response.backwardHash,
|
148 | };
|
149 | }
|
150 |
|
151 | async bulk(hashes) {
|
152 | const seq = this.getNextSeq();
|
153 | const packet = {
|
154 | channelId: this.channel.id,
|
155 | seq,
|
156 | hashes,
|
157 | };
|
158 |
|
159 | const response = await this.sendAndWait('bulk', seq, { bulk: packet });
|
160 |
|
161 | return {
|
162 | messages: response.messages.map((decoded) => {
|
163 | return Message.deserialize(decoded, { sodium: this.sodium });
|
164 | }),
|
165 | forwardIndex: response.forwardIndex,
|
166 | };
|
167 | }
|
168 |
|
169 |
|
170 |
|
171 |
|
172 |
|
173 | async sendAndWait(type, seq, packet) {
|
174 | const queryResponse = new Promise((resolve) => {
|
175 | this.resolveResponse[type].set(seq, resolve);
|
176 | });
|
177 |
|
178 | this.debug('sendAndWait %s seq=%d waiting=%d',
|
179 | type,
|
180 | seq,
|
181 | this.resolveResponse[type].size);
|
182 |
|
183 | await this.socket.send(PPacket.encode(packet).finish());
|
184 |
|
185 | const entry = this.timeoutWaitList.waitFor(null, this.options.timeout);
|
186 |
|
187 | try {
|
188 | return await Promise.race([
|
189 | queryResponse,
|
190 | entry.promise,
|
191 | ]);
|
192 | } finally {
|
193 | this.resolveResponse[type].delete(seq);
|
194 | entry.cancel();
|
195 | }
|
196 | }
|
197 |
|
198 | getNextSeq() {
|
199 | const result = this.seq;
|
200 | this.seq = (this.seq + 1) >>> 0;
|
201 | return result;
|
202 | }
|
203 |
|
204 | debug(fmt, ...args) {
|
205 | debug('channel.id=%s ' + fmt, ...[ this.channel.debugId ].concat(args));
|
206 | }
|
207 | }
|