UNPKG

9.02 kBJavaScriptView Raw
1/**
2 * Copyright 2017 Google Inc. All rights reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16const {helper, assert} = require('./helper');
17const debugProtocol = require('debug')('puppeteer:protocol');
18const debugSession = require('debug')('puppeteer:session');
19
20const EventEmitter = require('events');
21const WebSocket = require('ws');
22const Pipe = require('./Pipe');
23
24class Connection extends EventEmitter {
25 /**
26 * @param {string} url
27 * @param {number=} delay
28 * @return {!Promise<!Connection>}
29 */
30 static async createForWebSocket(url, delay = 0) {
31 return new Promise((resolve, reject) => {
32 const ws = new WebSocket(url, { perMessageDeflate: false });
33 ws.on('open', () => resolve(new Connection(url, ws, delay)));
34 ws.on('error', reject);
35 });
36 }
37
38 /**
39 * @param {!NodeJS.WritableStream} pipeWrite
40 * @param {!NodeJS.ReadableStream} pipeRead
41 * @param {number=} delay
42 * @return {!Connection}
43 */
44 static createForPipe(pipeWrite, pipeRead, delay = 0) {
45 return new Connection('', new Pipe(pipeWrite, pipeRead), delay);
46 }
47
48 /**
49 * @param {string} url
50 * @param {!Puppeteer.ConnectionTransport} transport
51 * @param {number=} delay
52 */
53 constructor(url, transport, delay = 0) {
54 super();
55 this._url = url;
56 this._lastId = 0;
57 /** @type {!Map<number, {resolve: function, reject: function, error: !Error, method: string}>}*/
58 this._callbacks = new Map();
59 this._delay = delay;
60
61 this._transport = transport;
62 this._transport.on('message', this._onMessage.bind(this));
63 this._transport.on('close', this._onClose.bind(this));
64 /** @type {!Map<string, !CDPSession>}*/
65 this._sessions = new Map();
66 }
67
68 /**
69 * @return {string}
70 */
71 url() {
72 return this._url;
73 }
74
75 /**
76 * @param {string} method
77 * @param {!Object=} params
78 * @return {!Promise<?Object>}
79 */
80 send(method, params = {}) {
81 const id = ++this._lastId;
82 const message = JSON.stringify({id, method, params});
83 debugProtocol('SEND ► ' + message);
84 this._transport.send(message);
85 return new Promise((resolve, reject) => {
86 this._callbacks.set(id, {resolve, reject, error: new Error(), method});
87 });
88 }
89
90 /**
91 * @param {function()} callback
92 */
93 setClosedCallback(callback) {
94 this._closeCallback = callback;
95 }
96
97 /**
98 * @param {string} message
99 */
100 async _onMessage(message) {
101 if (this._delay)
102 await new Promise(f => setTimeout(f, this._delay));
103 debugProtocol('◀ RECV ' + message);
104 const object = JSON.parse(message);
105 if (object.id) {
106 const callback = this._callbacks.get(object.id);
107 // Callbacks could be all rejected if someone has called `.dispose()`.
108 if (callback) {
109 this._callbacks.delete(object.id);
110 if (object.error)
111 callback.reject(createProtocolError(callback.error, callback.method, object));
112 else
113 callback.resolve(object.result);
114 }
115 } else {
116 if (object.method === 'Target.receivedMessageFromTarget') {
117 const session = this._sessions.get(object.params.sessionId);
118 if (session)
119 session._onMessage(object.params.message);
120 } else if (object.method === 'Target.detachedFromTarget') {
121 const session = this._sessions.get(object.params.sessionId);
122 if (session)
123 session._onClosed();
124 this._sessions.delete(object.params.sessionId);
125 } else {
126 this.emit(object.method, object.params);
127 }
128 }
129 }
130
131 _onClose() {
132 if (this._closeCallback) {
133 this._closeCallback();
134 this._closeCallback = null;
135 }
136 this._transport.removeAllListeners();
137 // If transport throws any error at this point of time, we don't care and should swallow it.
138 this._transport.on('error', () => {});
139 for (const callback of this._callbacks.values())
140 callback.reject(rewriteError(callback.error, `Protocol error (${callback.method}): Target closed.`));
141 this._callbacks.clear();
142 for (const session of this._sessions.values())
143 session._onClosed();
144 this._sessions.clear();
145 }
146
147 dispose() {
148 this._onClose();
149 this._transport.close();
150 }
151
152 /**
153 * @param {Protocol.Target.TargetInfo} targetInfo
154 * @return {!Promise<!CDPSession>}
155 */
156 async createSession(targetInfo) {
157 const {sessionId} = await this.send('Target.attachToTarget', {targetId: targetInfo.targetId});
158 const session = new CDPSession(this, targetInfo.type, sessionId);
159 this._sessions.set(sessionId, session);
160 return session;
161 }
162}
163
164class CDPSession extends EventEmitter {
165 /**
166 * @param {!Connection|!CDPSession} connection
167 * @param {string} targetType
168 * @param {string} sessionId
169 */
170 constructor(connection, targetType, sessionId) {
171 super();
172 this._lastId = 0;
173 /** @type {!Map<number, {resolve: function, reject: function, error: !Error, method: string}>}*/
174 this._callbacks = new Map();
175 /** @type {null|Connection|CDPSession} */
176 this._connection = connection;
177 this._targetType = targetType;
178 this._sessionId = sessionId;
179 /** @type {!Map<string, !CDPSession>}*/
180 this._sessions = new Map();
181 }
182
183 /**
184 * @param {string} method
185 * @param {!Object=} params
186 * @return {!Promise<?Object>}
187 */
188 send(method, params = {}) {
189 if (!this._connection)
190 return Promise.reject(new Error(`Protocol error (${method}): Session closed. Most likely the ${this._targetType} has been closed.`));
191 const id = ++this._lastId;
192 const message = JSON.stringify({id, method, params});
193 debugSession('SEND ► ' + message);
194 this._connection.send('Target.sendMessageToTarget', {sessionId: this._sessionId, message}).catch(e => {
195 // The response from target might have been already dispatched.
196 if (!this._callbacks.has(id))
197 return;
198 const callback = this._callbacks.get(id);
199 this._callbacks.delete(id);
200 callback.reject(rewriteError(callback.error, e && e.message));
201 });
202 return new Promise((resolve, reject) => {
203 this._callbacks.set(id, {resolve, reject, error: new Error(), method});
204 });
205 }
206
207 /**
208 * @param {string} message
209 */
210 _onMessage(message) {
211 debugSession('◀ RECV ' + message);
212 const object = JSON.parse(message);
213 if (object.id && this._callbacks.has(object.id)) {
214 const callback = this._callbacks.get(object.id);
215 this._callbacks.delete(object.id);
216 if (object.error)
217 callback.reject(createProtocolError(callback.error, callback.method, object));
218 else
219 callback.resolve(object.result);
220 } else {
221 if (object.method === 'Target.receivedMessageFromTarget') {
222 const session = this._sessions.get(object.params.sessionId);
223 if (session)
224 session._onMessage(object.params.message);
225 } else if (object.method === 'Target.detachedFromTarget') {
226 const session = this._sessions.get(object.params.sessionId);
227 if (session) {
228 session._onClosed();
229 this._sessions.delete(object.params.sessionId);
230 }
231 }
232 assert(!object.id);
233 this.emit(object.method, object.params);
234 }
235 }
236
237 async detach() {
238 if (!this._connection)
239 throw new Error(`Session already detached. Most likely the ${this._targetType} has been closed.`);
240 await this._connection.send('Target.detachFromTarget', {sessionId: this._sessionId});
241 }
242
243 _onClosed() {
244 for (const callback of this._callbacks.values())
245 callback.reject(rewriteError(callback.error, `Protocol error (${callback.method}): Target closed.`));
246 this._callbacks.clear();
247 this._connection = null;
248 }
249
250 /**
251 * @param {string} targetType
252 * @param {string} sessionId
253 */
254 _createSession(targetType, sessionId) {
255 const session = new CDPSession(this, targetType, sessionId);
256 this._sessions.set(sessionId, session);
257 return session;
258 }
259}
260helper.tracePublicAPI(CDPSession);
261
262/**
263 * @param {!Error} error
264 * @param {string} method
265 * @param {{error: {message: string, data: any}}} object
266 * @return {!Error}
267 */
268function createProtocolError(error, method, object) {
269 let message = `Protocol error (${method}): ${object.error.message}`;
270 if ('data' in object.error)
271 message += ` ${object.error.data}`;
272 return rewriteError(error, message);
273}
274
275/**
276 * @param {!Error} error
277 * @param {string} message
278 * @return {!Error}
279 */
280function rewriteError(error, message) {
281 error.message = message;
282 return error;
283}
284
285module.exports = {Connection, CDPSession};