UNPKG

7.12 kBPlain TextView Raw
1import isBlob from 'is-blob';
2import parse from './parse.js';
3import WebSocket from 'isomorphic-ws';
4import endpoints from './endpoints.js';
5import EventEmitter from 'eventemitter3';
6
7import {
8 Bar,
9 Channel,
10 DataSource,
11 DefaultCredentials,
12 Quote,
13 Trade,
14 TradeUpdate,
15 Message,
16 Endpoints,
17} from './entities.js';
18
19export declare interface Events {
20 open: (stream: AlpacaStream) => void;
21 close: (stream: AlpacaStream) => void;
22 authenticated: (stream: AlpacaStream) => void;
23 success: (message: Message) => void;
24 error: (message: WebSocket.ErrorEvent) => void;
25 subscription: (message: Message) => void;
26 message: (message: Object) => void;
27 trade_updates: (update: TradeUpdate) => void;
28 trade: (trade: Trade) => void;
29 quote: (quote: Quote) => void;
30 bar: (bar: Bar) => void;
31}
32
33export declare interface AlpacaStream {
34 on<U extends keyof Events>(event: U, listener: Events[U]): this;
35 once<U extends keyof Events>(event: U, listener: Events[U]): this;
36 emit<U extends keyof Events>(
37 event: U,
38 ...args: Parameters<Events[U]>
39 ): boolean;
40}
41
42export class AlpacaStream extends EventEmitter<string | symbol | any> {
43 private host: string;
44 private connection: WebSocket;
45 private authenticated: boolean;
46 private baseURLs: Endpoints = endpoints;
47
48 constructor(
49 protected params: {
50 credentials: DefaultCredentials;
51 type: 'account' | 'market_data';
52 source?: DataSource;
53 endpoints?: Endpoints | Map<keyof Endpoints, any>;
54 },
55 ) {
56 // construct EventEmitter
57 super();
58
59 // override endpoints if custom provided
60 if ('endpoints' in params) {
61 this.baseURLs = Object.assign(endpoints, params.endpoints);
62 }
63
64 if (
65 // if not specified
66 !('paper' in params.credentials) &&
67 // and live key isn't already provided
68 !('key' in params.credentials && params.credentials.key.startsWith('A'))
69 ) {
70 params.credentials['paper'] = true;
71 }
72
73 // assign the host we will connect to
74 switch (params.type) {
75 case 'account':
76 this.host = params.credentials.paper
77 ? this.baseURLs.websocket.account.replace('api.', 'paper-api.')
78 : this.baseURLs.websocket.account;
79 break;
80 case 'market_data':
81 this.host = this.baseURLs.websocket.market_data(this.params.source);
82 break;
83 default:
84 this.host = 'unknown';
85 }
86
87 this.connection = new WebSocket(this.host);
88 this.connection.onopen = () => {
89 let message = {};
90
91 switch (this.params.type) {
92 case 'account':
93 message = {
94 action: 'authenticate',
95 data: {
96 key_id: params.credentials.key,
97 secret_key: params.credentials.secret,
98 },
99 };
100 break;
101 case 'market_data':
102 // {"action":"auth","key":"PK*****","secret":"*************"}
103 message = { action: 'auth', ...params.credentials };
104 break;
105 }
106
107 this.connection.send(JSON.stringify(message));
108
109 // pass through
110 this.emit('open', this);
111 };
112
113 // pass through
114 this.connection.onclose = () => this.emit('close', this);
115
116 this.connection.onmessage = async (event: any) => {
117 let data = event.data;
118
119 if (isBlob(data)) {
120 data = await event.data.text();
121 } else if (data instanceof ArrayBuffer) {
122 data = String.fromCharCode(...new Uint8Array(event.data));
123 }
124
125 let parsed = JSON.parse(data),
126 messages = this.params.type == 'account' ? [parsed] : parsed;
127
128 messages.forEach((message: any) => {
129 // pass the message
130 this.emit('message', message);
131
132 // pass authenticated event
133 if ('T' in message && message.msg == 'authenticated') {
134 this.authenticated = true;
135 this.emit('authenticated', this);
136 } else if ('stream' in message && message.stream == 'authorization') {
137 if (message.data.status == 'authorized') {
138 this.authenticated = true;
139 this.emit('authenticated', this);
140 }
141 }
142
143 // pass trade_updates event
144 if ('stream' in message && message.stream == 'trade_updates') {
145 this.emit('trade_updates', parse.trade_update(message.data));
146 }
147
148 // pass trade, quote, bar event
149 const x: { [index: string]: keyof Events } = {
150 success: 'success',
151 subscription: 'subscription',
152 error: 'error',
153 t: 'trade',
154 q: 'quote',
155 b: 'bar',
156 };
157
158 if ('T' in message) {
159 this.emit(x[message.T.split('.')[0]], message);
160 }
161 });
162 };
163
164 // pass the error
165 this.connection.onerror = (err: WebSocket.ErrorEvent) => {
166 this.emit('error', err);
167 };
168 }
169
170 /**
171 * Retrieve the underlying WebSocket connection AlpacaStream uses.
172 * Now callers can read and modify properties of the web socket
173 * i.e., close the websocket with AlpacaStream.getConnection().close().
174 * @returns a WebSocket object
175 */
176 getConnection() {
177 return this.connection;
178 }
179
180 /**
181 * Subscribe to an account or data stream channel.
182 * @param channel trades, quotes, bars, trade_updates
183 * @param symbols only use with data stream ex. [ "AAPL", "TSLA", ... ]
184 */
185 subscribe(channel: Channel, symbols: string[] = []) {
186 switch (this.params.type) {
187 case 'account':
188 // {"action":"listen","data":{"streams":["trade_updates"]}}
189 this.send(
190 JSON.stringify({ action: 'listen', data: { streams: [channel] } }),
191 );
192 break;
193 case 'market_data':
194 // {"action":"subscribe","trades":["AAPL"],"quotes":["AMD","CLDR"],"bars":["AAPL","VOO"]}
195 let message: any = { action: 'subscribe' };
196 message[channel] = symbols;
197 this.send(JSON.stringify(message));
198 break;
199 }
200
201 return this;
202 }
203
204 /**
205 * Unsubscribe to an account or data stream channel.
206 * @param channel trades, quotes, bars, trade_updates
207 * @param symbols only use with data stream ex. [ "AAPL", "TSLA", ... ]
208 */
209 unsubscribe(channel: Channel, symbols: string[] = []) {
210 switch (this.params.type) {
211 case 'account':
212 // {"action":"unlisten","data":{"streams":["trade_updates"]}}
213 this.send(
214 JSON.stringify({ action: 'unlisten', data: { streams: [channel] } }),
215 );
216 break;
217 case 'market_data':
218 // {"action":"unsubscribe","trades":["AAPL"],"quotes":["AMD","CLDR"],"bars":["AAPL","VOO"]}
219 let message: any = { action: 'unsubscribe' };
220 message[channel] = symbols;
221 this.send(JSON.stringify(message));
222 break;
223 }
224
225 return this;
226 }
227
228 private send(message: any) {
229 // don't bother if we aren't authenticated
230 if (!this.authenticated) {
231 throw new Error('not authenticated');
232 }
233
234 // if the message is in object form, stringify it for the user
235 if (typeof message == 'object') {
236 message = JSON.stringify(message);
237 }
238
239 // send it off
240 this.connection.send(message);
241
242 // chainable return
243 return this;
244 }
245}