1 | const io = requireSocketIo
|
2 | const Emitter = require('component-emitter')
|
3 | const has = require('@uppy/utils/lib/hasProperty')
|
4 | const parseUrl = require('./parseUrl')
|
5 | const NetworkError = require('@uppy/utils/lib/NetworkError')
|
6 | const fetchWithNetworkError = require('@uppy/utils/lib/fetchWithNetworkError')
|
7 |
|
8 |
|
9 |
|
10 |
|
11 |
|
12 |
|
13 | let socketIo
|
14 | function requireSocketIo () {
|
15 | if (!socketIo) {
|
16 | socketIo = require('socket.io-client')
|
17 | }
|
18 | return socketIo
|
19 | }
|
20 |
|
21 | const ASSEMBLY_UPLOADING = 'ASSEMBLY_UPLOADING'
|
22 | const ASSEMBLY_EXECUTING = 'ASSEMBLY_EXECUTING'
|
23 | const ASSEMBLY_COMPLETED = 'ASSEMBLY_COMPLETED'
|
24 |
|
25 | const statusOrder = [
|
26 | ASSEMBLY_UPLOADING,
|
27 | ASSEMBLY_EXECUTING,
|
28 | ASSEMBLY_COMPLETED
|
29 | ]
|
30 |
|
31 |
|
32 |
|
33 |
|
34 |
|
35 |
|
36 |
|
37 |
|
38 |
|
39 |
|
40 |
|
41 | function isStatus (status, test) {
|
42 | return statusOrder.indexOf(status) >= statusOrder.indexOf(test)
|
43 | }
|
44 |
|
45 | class TransloaditAssembly extends Emitter {
|
46 | constructor (assembly) {
|
47 | super()
|
48 |
|
49 |
|
50 | this.status = assembly
|
51 |
|
52 | this.socket = null
|
53 |
|
54 | this.pollInterval = null
|
55 |
|
56 | this.closed = false
|
57 | }
|
58 |
|
59 | connect () {
|
60 | this._connectSocket()
|
61 | this._beginPolling()
|
62 | }
|
63 |
|
64 | _onFinished () {
|
65 | this.emit('finished')
|
66 | this.close()
|
67 | }
|
68 |
|
69 | _connectSocket () {
|
70 | const parsed = parseUrl(this.status.websocket_url)
|
71 | const socket = io().connect(parsed.origin, {
|
72 | transports: ['websocket'],
|
73 | path: parsed.pathname
|
74 | })
|
75 |
|
76 | socket.on('connect', () => {
|
77 | socket.emit('assembly_connect', {
|
78 | id: this.status.assembly_id
|
79 | })
|
80 |
|
81 | this.emit('connect')
|
82 | })
|
83 |
|
84 | socket.on('connect_failed', () => {
|
85 | this._onError(new NetworkError('Transloadit Socket.io connection error'))
|
86 | this.socket = null
|
87 | })
|
88 |
|
89 | socket.on('error', () => {
|
90 | socket.disconnect()
|
91 | this.socket = null
|
92 | })
|
93 |
|
94 | socket.on('assembly_finished', () => {
|
95 | this._onFinished()
|
96 | })
|
97 |
|
98 | socket.on('assembly_upload_finished', (file) => {
|
99 | this.emit('upload', file)
|
100 | this.status.uploads.push(file)
|
101 | })
|
102 |
|
103 | socket.on('assembly_uploading_finished', () => {
|
104 | this.emit('executing')
|
105 | })
|
106 |
|
107 | socket.on('assembly_upload_meta_data_extracted', () => {
|
108 | this.emit('metadata')
|
109 | this._fetchStatus({ diff: false })
|
110 | })
|
111 |
|
112 | socket.on('assembly_result_finished', (stepName, result) => {
|
113 | this.emit('result', stepName, result)
|
114 | if (!this.status.results[stepName]) {
|
115 | this.status.results[stepName] = []
|
116 | }
|
117 | this.status.results[stepName].push(result)
|
118 | })
|
119 |
|
120 | socket.on('assembly_error', (err) => {
|
121 | this._onError(err)
|
122 |
|
123 | this._fetchStatus({ diff: false })
|
124 | })
|
125 |
|
126 | this.socket = socket
|
127 | }
|
128 |
|
129 | _onError (err) {
|
130 | this.emit('error', Object.assign(new Error(err.message), err))
|
131 | }
|
132 |
|
133 | |
134 |
|
135 |
|
136 |
|
137 |
|
138 |
|
139 | _beginPolling () {
|
140 | this.pollInterval = setInterval(() => {
|
141 | if (!this.socket || !this.socket.connected) {
|
142 | this._fetchStatus()
|
143 | }
|
144 | }, 2000)
|
145 | }
|
146 |
|
147 | |
148 |
|
149 |
|
150 |
|
151 |
|
152 |
|
153 | _fetchStatus ({ diff = true } = {}) {
|
154 | return fetchWithNetworkError(this.status.assembly_ssl_url)
|
155 | .then((response) => response.json())
|
156 | .then((status) => {
|
157 |
|
158 | if (this.closed) return
|
159 | this.emit('status', status)
|
160 |
|
161 | if (diff) {
|
162 | this.updateStatus(status)
|
163 | } else {
|
164 | this.status = status
|
165 | }
|
166 | })
|
167 | .catch((err) => this._onError(err))
|
168 | }
|
169 |
|
170 | update () {
|
171 | return this._fetchStatus({ diff: true })
|
172 | }
|
173 |
|
174 | |
175 |
|
176 |
|
177 |
|
178 |
|
179 |
|
180 | updateStatus (next) {
|
181 | this._diffStatus(this.status, next)
|
182 | this.status = next
|
183 | }
|
184 |
|
185 | |
186 |
|
187 |
|
188 |
|
189 |
|
190 |
|
191 |
|
192 | _diffStatus (prev, next) {
|
193 | const prevStatus = prev.ok
|
194 | const nextStatus = next.ok
|
195 |
|
196 | if (next.error && !prev.error) {
|
197 | return this._onError(next)
|
198 | }
|
199 |
|
200 |
|
201 |
|
202 |
|
203 |
|
204 |
|
205 |
|
206 |
|
207 |
|
208 |
|
209 | const nowExecuting =
|
210 | isStatus(nextStatus, ASSEMBLY_EXECUTING) &&
|
211 | !isStatus(prevStatus, ASSEMBLY_EXECUTING)
|
212 | if (nowExecuting) {
|
213 |
|
214 |
|
215 |
|
216 |
|
217 | this.emit('executing')
|
218 | }
|
219 |
|
220 |
|
221 | Object.keys(next.uploads)
|
222 | .filter((upload) => !has(prev.uploads, upload))
|
223 | .map((upload) => next.uploads[upload])
|
224 | .forEach((upload) => {
|
225 | this.emit('upload', upload)
|
226 | })
|
227 |
|
228 | if (nowExecuting) {
|
229 | this.emit('metadata')
|
230 | }
|
231 |
|
232 |
|
233 | Object.keys(next.results).forEach((stepName) => {
|
234 | const nextResults = next.results[stepName]
|
235 | const prevResults = prev.results[stepName]
|
236 |
|
237 | nextResults
|
238 | .filter((n) => !prevResults || !prevResults.some((p) => p.id === n.id))
|
239 | .forEach((result) => {
|
240 | this.emit('result', stepName, result)
|
241 | })
|
242 | })
|
243 |
|
244 | if (isStatus(nextStatus, ASSEMBLY_COMPLETED) &&
|
245 | !isStatus(prevStatus, ASSEMBLY_COMPLETED)) {
|
246 | this.emit('finished')
|
247 | }
|
248 | }
|
249 |
|
250 | |
251 |
|
252 |
|
253 | close () {
|
254 | this.closed = true
|
255 | if (this.socket) {
|
256 | this.socket.disconnect()
|
257 | this.socket = null
|
258 | }
|
259 | clearInterval(this.pollInterval)
|
260 | }
|
261 | }
|
262 |
|
263 | module.exports = TransloaditAssembly
|