UNPKG

7.19 kBJavaScriptView Raw
1const io = requireSocketIo
2const Emitter = require('component-emitter')
3const has = require('@uppy/utils/lib/hasProperty')
4const parseUrl = require('./parseUrl')
5const NetworkError = require('@uppy/utils/lib/NetworkError')
6const fetchWithNetworkError = require('@uppy/utils/lib/fetchWithNetworkError')
7
8// Lazy load socket.io to avoid a console error
9// in IE 10 when the Transloadit plugin is not used.
10// (The console.error call comes from `buffer`. I
11// think we actually don't use that part of socket.io
12// at all…)
13let socketIo
14function requireSocketIo () {
15 if (!socketIo) {
16 socketIo = require('socket.io-client')
17 }
18 return socketIo
19}
20
21const ASSEMBLY_UPLOADING = 'ASSEMBLY_UPLOADING'
22const ASSEMBLY_EXECUTING = 'ASSEMBLY_EXECUTING'
23const ASSEMBLY_COMPLETED = 'ASSEMBLY_COMPLETED'
24
25const statusOrder = [
26 ASSEMBLY_UPLOADING,
27 ASSEMBLY_EXECUTING,
28 ASSEMBLY_COMPLETED
29]
30
31/**
32 * Check that an assembly status is equal to or larger than some desired status.
33 * It checks for things that are larger so that a comparison like this works,
34 * when the old assembly status is UPLOADING but the new is FINISHED:
35 *
36 * !isStatus(oldStatus, ASSEMBLY_EXECUTING) && isStatus(newState, ASSEMBLY_EXECUTING)
37 *
38 * …so that we can emit the 'executing' event even if the execution step was so
39 * fast that we missed it.
40 */
41function isStatus (status, test) {
42 return statusOrder.indexOf(status) >= statusOrder.indexOf(test)
43}
44
45class TransloaditAssembly extends Emitter {
46 constructor (assembly) {
47 super()
48
49 // The current assembly status.
50 this.status = assembly
51 // The socket.io connection.
52 this.socket = null
53 // The interval timer for full status updates.
54 this.pollInterval = null
55 // Whether this assembly has been closed (finished or errored)
56 this.closed = false
57 }
58
59 connect () {
60 this._connectSocket()
61 this._beginPolling()
62 }
63
64 _onFinished () {
65 this.emit('finished')
66 this.close()
67 }
68
69 _connectSocket () {
70 const parsed = parseUrl(this.status.websocket_url)
71 const socket = io().connect(parsed.origin, {
72 transports: ['websocket'],
73 path: parsed.pathname
74 })
75
76 socket.on('connect', () => {
77 socket.emit('assembly_connect', {
78 id: this.status.assembly_id
79 })
80
81 this.emit('connect')
82 })
83
84 socket.on('connect_failed', () => {
85 this._onError(new NetworkError('Transloadit Socket.io connection error'))
86 this.socket = null
87 })
88
89 socket.on('error', () => {
90 socket.disconnect()
91 this.socket = null
92 })
93
94 socket.on('assembly_finished', () => {
95 this._onFinished()
96 })
97
98 socket.on('assembly_upload_finished', (file) => {
99 this.emit('upload', file)
100 this.status.uploads.push(file)
101 })
102
103 socket.on('assembly_uploading_finished', () => {
104 this.emit('executing')
105 })
106
107 socket.on('assembly_upload_meta_data_extracted', () => {
108 this.emit('metadata')
109 this._fetchStatus({ diff: false })
110 })
111
112 socket.on('assembly_result_finished', (stepName, result) => {
113 this.emit('result', stepName, result)
114 if (!this.status.results[stepName]) {
115 this.status.results[stepName] = []
116 }
117 this.status.results[stepName].push(result)
118 })
119
120 socket.on('assembly_error', (err) => {
121 this._onError(err)
122 // Refetch for updated status code
123 this._fetchStatus({ diff: false })
124 })
125
126 this.socket = socket
127 }
128
129 _onError (err) {
130 this.emit('error', Object.assign(new Error(err.message), err))
131 }
132
133 /**
134 * Begin polling for assembly status changes. This sends a request to the
135 * assembly status endpoint every so often, if the socket is not connected.
136 * If the socket connection fails or takes a long time, we won't miss any
137 * events.
138 */
139 _beginPolling () {
140 this.pollInterval = setInterval(() => {
141 if (!this.socket || !this.socket.connected) {
142 this._fetchStatus()
143 }
144 }, 2000)
145 }
146
147 /**
148 * Reload assembly status. Useful if the socket doesn't work.
149 *
150 * Pass `diff: false` to avoid emitting diff events, instead only emitting
151 * 'status'.
152 */
153 _fetchStatus ({ diff = true } = {}) {
154 return fetchWithNetworkError(this.status.assembly_ssl_url)
155 .then((response) => response.json())
156 .then((status) => {
157 // Avoid updating if we closed during this request's lifetime.
158 if (this.closed) return
159 this.emit('status', status)
160
161 if (diff) {
162 this.updateStatus(status)
163 } else {
164 this.status = status
165 }
166 })
167 .catch((err) => this._onError(err))
168 }
169
170 update () {
171 return this._fetchStatus({ diff: true })
172 }
173
174 /**
175 * Update this assembly's status with a full new object. Events will be
176 * emitted for status changes, new files, and new results.
177 *
178 * @param {object} next The new assembly status object.
179 */
180 updateStatus (next) {
181 this._diffStatus(this.status, next)
182 this.status = next
183 }
184
185 /**
186 * Diff two assembly statuses, and emit the events necessary to go from `prev`
187 * to `next`.
188 *
189 * @param {object} prev The previous assembly status.
190 * @param {object} next The new assembly status.
191 */
192 _diffStatus (prev, next) {
193 const prevStatus = prev.ok
194 const nextStatus = next.ok
195
196 if (next.error && !prev.error) {
197 return this._onError(next)
198 }
199
200 // Desired emit order:
201 // - executing
202 // - (n × upload)
203 // - metadata
204 // - (m × result)
205 // - finished
206 // The below checks run in this order, that way even if we jump from
207 // UPLOADING straight to FINISHED all the events are emitted as expected.
208
209 const nowExecuting =
210 isStatus(nextStatus, ASSEMBLY_EXECUTING) &&
211 !isStatus(prevStatus, ASSEMBLY_EXECUTING)
212 if (nowExecuting) {
213 // Without WebSockets, this is our only way to tell if uploading finished.
214 // Hence, we emit this just before the 'upload's and before the 'metadata'
215 // event for the most intuitive ordering, corresponding to the _usual_
216 // ordering (if not guaranteed) that you'd get on the WebSocket.
217 this.emit('executing')
218 }
219
220 // Find new uploaded files.
221 Object.keys(next.uploads)
222 .filter((upload) => !has(prev.uploads, upload))
223 .map((upload) => next.uploads[upload])
224 .forEach((upload) => {
225 this.emit('upload', upload)
226 })
227
228 if (nowExecuting) {
229 this.emit('metadata')
230 }
231
232 // Find new results.
233 Object.keys(next.results).forEach((stepName) => {
234 const nextResults = next.results[stepName]
235 const prevResults = prev.results[stepName]
236
237 nextResults
238 .filter((n) => !prevResults || !prevResults.some((p) => p.id === n.id))
239 .forEach((result) => {
240 this.emit('result', stepName, result)
241 })
242 })
243
244 if (isStatus(nextStatus, ASSEMBLY_COMPLETED) &&
245 !isStatus(prevStatus, ASSEMBLY_COMPLETED)) {
246 this.emit('finished')
247 }
248 }
249
250 /**
251 * Stop updating this assembly.
252 */
253 close () {
254 this.closed = true
255 if (this.socket) {
256 this.socket.disconnect()
257 this.socket = null
258 }
259 clearInterval(this.pollInterval)
260 }
261}
262
263module.exports = TransloaditAssembly