UNPKG

7.69 kBJavaScriptView Raw
1/**
2 * Copyright 2017 Google Inc. All rights reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16const {helper} = require('./helper');
17const debugProtocol = require('debug')('puppeteer:protocol');
18const debugSession = require('debug')('puppeteer:session');
19
20const EventEmitter = require('events');
21const WebSocket = require('ws');
22const Pipe = require('./Pipe');
23
24class Connection extends EventEmitter {
25 /**
26 * @param {string} url
27 * @param {number=} delay
28 * @return {!Promise<!Connection>}
29 */
30 static async createForWebSocket(url, delay = 0) {
31 return new Promise((resolve, reject) => {
32 const ws = new WebSocket(url, { perMessageDeflate: false });
33 ws.on('open', () => resolve(new Connection(url, ws, delay)));
34 ws.on('error', reject);
35 });
36 }
37
38 /**
39 * @param {!NodeJS.WritableStream} pipeWrite
40 * @param {!NodeJS.ReadableStream} pipeRead
41 * @param {number=} delay
42 * @return {!Connection}
43 */
44 static createForPipe(pipeWrite, pipeRead, delay = 0) {
45 return new Connection('', new Pipe(pipeWrite, pipeRead), delay);
46 }
47
48 /**
49 * @param {string} url
50 * @param {!Puppeteer.ConnectionTransport} transport
51 * @param {number=} delay
52 */
53 constructor(url, transport, delay = 0) {
54 super();
55 this._url = url;
56 this._lastId = 0;
57 /** @type {!Map<number, {resolve: function, reject: function, error: !Error, method: string}>}*/
58 this._callbacks = new Map();
59 this._delay = delay;
60
61 this._transport = transport;
62 this._transport.on('message', this._onMessage.bind(this));
63 this._transport.on('close', this._onClose.bind(this));
64 /** @type {!Map<string, !CDPSession>}*/
65 this._sessions = new Map();
66 }
67
68 /**
69 * @return {string}
70 */
71 url() {
72 return this._url;
73 }
74
75 /**
76 * @param {string} method
77 * @param {!Object=} params
78 * @return {!Promise<?Object>}
79 */
80 send(method, params = {}) {
81 const id = ++this._lastId;
82 const message = JSON.stringify({id, method, params});
83 debugProtocol('SEND ► ' + message);
84 this._transport.send(message);
85 return new Promise((resolve, reject) => {
86 this._callbacks.set(id, {resolve, reject, error: new Error(), method});
87 });
88 }
89
90 /**
91 * @param {function()} callback
92 */
93 setClosedCallback(callback) {
94 this._closeCallback = callback;
95 }
96
97 /**
98 * @param {string} message
99 */
100 async _onMessage(message) {
101 if (this._delay)
102 await new Promise(f => setTimeout(f, this._delay));
103 debugProtocol('◀ RECV ' + message);
104 const object = JSON.parse(message);
105 if (object.id) {
106 const callback = this._callbacks.get(object.id);
107 // Callbacks could be all rejected if someone has called `.dispose()`.
108 if (callback) {
109 this._callbacks.delete(object.id);
110 if (object.error)
111 callback.reject(rewriteError(callback.error, `Protocol error (${callback.method}): ${object.error.message} ${object.error.data}`));
112 else
113 callback.resolve(object.result);
114 }
115 } else {
116 if (object.method === 'Target.receivedMessageFromTarget') {
117 const session = this._sessions.get(object.params.sessionId);
118 if (session)
119 session._onMessage(object.params.message);
120 } else if (object.method === 'Target.detachedFromTarget') {
121 const session = this._sessions.get(object.params.sessionId);
122 if (session)
123 session._onClosed();
124 this._sessions.delete(object.params.sessionId);
125 } else {
126 this.emit(object.method, object.params);
127 }
128 }
129 }
130
131 _onClose() {
132 if (this._closeCallback) {
133 this._closeCallback();
134 this._closeCallback = null;
135 }
136 this._transport.removeAllListeners();
137 // If transport throws any error at this point of time, we don't care and should swallow it.
138 this._transport.on('error', () => {});
139 for (const callback of this._callbacks.values())
140 callback.reject(rewriteError(callback.error, `Protocol error (${callback.method}): Target closed.`));
141 this._callbacks.clear();
142 for (const session of this._sessions.values())
143 session._onClosed();
144 this._sessions.clear();
145 }
146
147 dispose() {
148 this._onClose();
149 this._transport.close();
150 }
151
152 /**
153 * @param {string} targetId
154 * @return {!Promise<!CDPSession>}
155 */
156 async createSession(targetId) {
157 const {sessionId} = await this.send('Target.attachToTarget', {targetId});
158 const session = new CDPSession(this, targetId, sessionId);
159 this._sessions.set(sessionId, session);
160 return session;
161 }
162}
163
164class CDPSession extends EventEmitter {
165 /**
166 * @param {!Connection} connection
167 * @param {string} targetId
168 * @param {string} sessionId
169 */
170 constructor(connection, targetId, sessionId) {
171 super();
172 this._lastId = 0;
173 /** @type {!Map<number, {resolve: function, reject: function, error: !Error, method: string}>}*/
174 this._callbacks = new Map();
175 this._connection = connection;
176 this._targetId = targetId;
177 this._sessionId = sessionId;
178 }
179
180 /**
181 * @param {string} method
182 * @param {!Object=} params
183 * @return {!Promise<?Object>}
184 */
185 send(method, params = {}) {
186 if (!this._connection)
187 return Promise.reject(new Error(`Protocol error (${method}): Session closed. Most likely the page has been closed.`));
188 const id = ++this._lastId;
189 const message = JSON.stringify({id, method, params});
190 debugSession('SEND ► ' + message);
191 this._connection.send('Target.sendMessageToTarget', {sessionId: this._sessionId, message}).catch(e => {
192 // The response from target might have been already dispatched.
193 if (!this._callbacks.has(id))
194 return;
195 const callback = this._callbacks.get(id);
196 this._callbacks.delete(id);
197 callback.reject(rewriteError(callback.error, e && e.message));
198 });
199 return new Promise((resolve, reject) => {
200 this._callbacks.set(id, {resolve, reject, error: new Error(), method});
201 });
202 }
203
204 /**
205 * @param {string} message
206 */
207 _onMessage(message) {
208 debugSession('◀ RECV ' + message);
209 const object = JSON.parse(message);
210 if (object.id && this._callbacks.has(object.id)) {
211 const callback = this._callbacks.get(object.id);
212 this._callbacks.delete(object.id);
213 if (object.error)
214 callback.reject(rewriteError(callback.error, `Protocol error (${callback.method}): ${object.error.message} ${object.error.data}`));
215 else
216 callback.resolve(object.result);
217 } else {
218 console.assert(!object.id);
219 this.emit(object.method, object.params);
220 }
221 }
222
223 async detach() {
224 await this._connection.send('Target.detachFromTarget', {sessionId: this._sessionId});
225 }
226
227 _onClosed() {
228 for (const callback of this._callbacks.values())
229 callback.reject(rewriteError(callback.error, `Protocol error (${callback.method}): Target closed.`));
230 this._callbacks.clear();
231 this._connection = null;
232 }
233}
234helper.tracePublicAPI(CDPSession);
235
236/**
237 * @param {!Error} error
238 * @param {string} message
239 * @return {!Error}
240 */
241function rewriteError(error, message) {
242 error.message = message;
243 return error;
244}
245
246module.exports = {Connection, CDPSession};