UNPKG

4.95 kBJavaScriptView Raw
1const assert = require('assert');
2const dgram = require('dgram');
3const net = require('net');
4const dns = require('dns');
5const { PROTOCOL } = require('./constants');
6
7// Imported below, only if needed
8let unixDgram;
9
10const UDS_PATH_DEFAULT = '/var/run/datadog/dsd.socket';
11
12const addEol = (buf) => {
13 let msg = buf.toString();
14 if (msg.length > 0 && msg[msg.length - 1] !== '\n') {
15 msg += '\n';
16 }
17 return msg;
18};
19
20// interface Transport {
21// emit(name: string, payload: any):void;
22// on(name: string, listener: Function):void;
23// removeListener(name: string, listener: Function):void;
24// send(buf: Buffer, callback: Function):void;
25// close():void;
26// unref(): void;
27// }
28const createTcpTransport = args => {
29 const socket = net.connect(args.port, args.host);
30 socket.setKeepAlive(true);
31 // do not block node from shutting down
32 socket.unref();
33 return {
34 emit: socket.emit.bind(socket),
35 on: socket.on.bind(socket),
36 removeListener: socket.removeListener.bind(socket),
37 send: (buf, callback) => {
38 socket.write(addEol(buf), 'ascii', callback);
39 },
40 close: () => socket.destroy(),
41 unref: socket.unref.bind(socket)
42
43 };
44};
45
46const createUdpTransport = args => {
47 const socket = dgram.createSocket('udp4');
48 // do not block node from shutting down
49 socket.unref();
50
51 const dnsResolutionData = {
52 timestamp: new Date(0),
53 resolvedAddress: undefined
54 };
55
56 const sendUsingDnsCache = (callback, buf) => {
57 const now = Date.now();
58 if (dnsResolutionData.resolvedAddress === undefined || (now - dnsResolutionData.timestamp > args.cacheDnsTtl)) {
59 dns.lookup(args.host, (error, address) => {
60 if (error) {
61 callback(error);
62 return;
63 }
64 dnsResolutionData.resolvedAddress = address;
65 dnsResolutionData.timestamp = now;
66 socket.send(buf, 0, buf.length, args.port, dnsResolutionData.resolvedAddress, callback);
67 });
68 } else {
69 socket.send(buf, 0, buf.length, args.port, dnsResolutionData.resolvedAddress, callback);
70 }
71 };
72
73 return {
74 emit: socket.emit.bind(socket),
75 on: socket.on.bind(socket),
76 removeListener: socket.removeListener.bind(socket),
77 send: function (buf, callback) {
78 if (args.cacheDns) {
79 sendUsingDnsCache(callback, buf);
80 } else {
81 socket.send(buf, 0, buf.length, args.port, args.host, callback);
82 }
83 },
84 close: socket.close.bind(socket),
85 unref: socket.unref.bind(socket)
86 };
87};
88
89const createUdsTransport = args => {
90 try {
91 // This will not always be available, as noted in the error message below
92 unixDgram = require('unix-dgram'); // eslint-disable-line global-require
93 } catch (err) {
94 throw new Error(
95 'The library `unix_dgram`, needed for the uds protocol to work, is not installed. ' +
96 'You need to pick another protocol to use hot-shots. ' +
97 'See the hot-shots README for additional details.'
98 );
99 }
100 const udsPath = args.path ? args.path : UDS_PATH_DEFAULT;
101 const socket = unixDgram.createSocket('unix_dgram');
102 socket.connect(udsPath);
103
104 return {
105 emit: socket.emit.bind(socket),
106 on: socket.on.bind(socket),
107 removeListener: socket.removeListener.bind(socket),
108 send: socket.send.bind(socket),
109 close: () => {
110 socket.close();
111 // close is synchronous, and the socket will not emit a
112 // close event, hence emulating standard behaviour by doing this:
113 socket.emit('close');
114 },
115 unref: () => {
116 throw new Error('unix-dgram does not implement unref for sockets');
117 }
118 };
119};
120
121const createStreamTransport = (args) => {
122 const stream = args.stream;
123 assert(stream, '`stream` option required');
124
125 return {
126 emit: stream.emit.bind(stream),
127 on: stream.on.bind(stream),
128 removeListener: stream.removeListener.bind(stream),
129 send: (buf, callback) => stream.write(addEol(buf), callback),
130 close: () => {
131 stream.destroy();
132
133 // Node v8 doesn't fire `close` event on stream destroy.
134 if (process.version.split('.').shift() === 'v8') {
135 stream.emit('close');
136 }
137 },
138 unref: () => {
139 throw new Error('stream transport does not support unref');
140 }
141 };
142};
143
144module.exports = (instance, args) => {
145 let transport = null;
146 const protocol = args.protocol || PROTOCOL.UDP;
147
148 try {
149 if (protocol === PROTOCOL.TCP) {
150 transport = createTcpTransport(args);
151 } else if (protocol === PROTOCOL.UDS) {
152 transport = createUdsTransport(args);
153 } else if (protocol === PROTOCOL.UDP) {
154 transport = createUdpTransport(args);
155 } else if (protocol === PROTOCOL.STREAM) {
156 transport = createStreamTransport(args);
157 } else {
158 throw new Error(`Unsupported protocol '${protocol}'`);
159 }
160 transport.type = protocol;
161 } catch (e) {
162 if (instance.errorHandler) {
163 instance.errorHandler(e);
164 } else {
165 console.error(e);
166 }
167 }
168
169 return transport;
170};