1 | const assert = require('assert');
|
2 | const dgram = require('dgram');
|
3 | const net = require('net');
|
4 | const dns = require('dns');
|
5 | const { PROTOCOL } = require('./constants');
|
6 |
|
7 |
|
8 | let unixDgram;
|
9 |
|
10 | const UDS_PATH_DEFAULT = '/var/run/datadog/dsd.socket';
|
11 |
|
12 | const addEol = (buf) => {
|
13 | let msg = buf.toString();
|
14 | if (msg.length > 0 && msg[msg.length - 1] !== '\n') {
|
15 | msg += '\n';
|
16 | }
|
17 | return msg;
|
18 | };
|
19 |
|
20 |
|
21 |
|
22 |
|
23 |
|
24 |
|
25 |
|
26 |
|
27 |
|
28 | const createTcpTransport = args => {
|
29 | const socket = net.connect(args.port, args.host);
|
30 | socket.setKeepAlive(true);
|
31 |
|
32 | socket.unref();
|
33 | return {
|
34 | emit: socket.emit.bind(socket),
|
35 | on: socket.on.bind(socket),
|
36 | removeListener: socket.removeListener.bind(socket),
|
37 | send: (buf, callback) => {
|
38 | socket.write(addEol(buf), 'ascii', callback);
|
39 | },
|
40 | close: () => socket.destroy(),
|
41 | unref: socket.unref.bind(socket)
|
42 |
|
43 | };
|
44 | };
|
45 |
|
46 | const createUdpTransport = args => {
|
47 | const socket = dgram.createSocket('udp4');
|
48 |
|
49 | socket.unref();
|
50 |
|
51 | const dnsResolutionData = {
|
52 | timestamp: new Date(0),
|
53 | resolvedAddress: undefined
|
54 | };
|
55 |
|
56 | const sendUsingDnsCache = (callback, buf) => {
|
57 | const now = Date.now();
|
58 | if (dnsResolutionData.resolvedAddress === undefined || (now - dnsResolutionData.timestamp > args.cacheDnsTtl)) {
|
59 | dns.lookup(args.host, (error, address) => {
|
60 | if (error) {
|
61 | callback(error);
|
62 | return;
|
63 | }
|
64 | dnsResolutionData.resolvedAddress = address;
|
65 | dnsResolutionData.timestamp = now;
|
66 | socket.send(buf, 0, buf.length, args.port, dnsResolutionData.resolvedAddress, callback);
|
67 | });
|
68 | } else {
|
69 | socket.send(buf, 0, buf.length, args.port, dnsResolutionData.resolvedAddress, callback);
|
70 | }
|
71 | };
|
72 |
|
73 | return {
|
74 | emit: socket.emit.bind(socket),
|
75 | on: socket.on.bind(socket),
|
76 | removeListener: socket.removeListener.bind(socket),
|
77 | send: function (buf, callback) {
|
78 | if (args.cacheDns) {
|
79 | sendUsingDnsCache(callback, buf);
|
80 | } else {
|
81 | socket.send(buf, 0, buf.length, args.port, args.host, callback);
|
82 | }
|
83 | },
|
84 | close: socket.close.bind(socket),
|
85 | unref: socket.unref.bind(socket)
|
86 | };
|
87 | };
|
88 |
|
89 | const createUdsTransport = args => {
|
90 | try {
|
91 |
|
92 | unixDgram = require('unix-dgram');
|
93 | } catch (err) {
|
94 | throw new Error(
|
95 | 'The library `unix_dgram`, needed for the uds protocol to work, is not installed. ' +
|
96 | 'You need to pick another protocol to use hot-shots. ' +
|
97 | 'See the hot-shots README for additional details.'
|
98 | );
|
99 | }
|
100 | const udsPath = args.path ? args.path : UDS_PATH_DEFAULT;
|
101 | const socket = unixDgram.createSocket('unix_dgram');
|
102 | socket.connect(udsPath);
|
103 |
|
104 | return {
|
105 | emit: socket.emit.bind(socket),
|
106 | on: socket.on.bind(socket),
|
107 | removeListener: socket.removeListener.bind(socket),
|
108 | send: socket.send.bind(socket),
|
109 | close: () => {
|
110 | socket.close();
|
111 |
|
112 |
|
113 | socket.emit('close');
|
114 | },
|
115 | unref: () => {
|
116 | throw new Error('unix-dgram does not implement unref for sockets');
|
117 | }
|
118 | };
|
119 | };
|
120 |
|
121 | const createStreamTransport = (args) => {
|
122 | const stream = args.stream;
|
123 | assert(stream, '`stream` option required');
|
124 |
|
125 | return {
|
126 | emit: stream.emit.bind(stream),
|
127 | on: stream.on.bind(stream),
|
128 | removeListener: stream.removeListener.bind(stream),
|
129 | send: (buf, callback) => stream.write(addEol(buf), callback),
|
130 | close: () => {
|
131 | stream.destroy();
|
132 |
|
133 |
|
134 | if (process.version.split('.').shift() === 'v8') {
|
135 | stream.emit('close');
|
136 | }
|
137 | },
|
138 | unref: () => {
|
139 | throw new Error('stream transport does not support unref');
|
140 | }
|
141 | };
|
142 | };
|
143 |
|
144 | module.exports = (instance, args) => {
|
145 | let transport = null;
|
146 | const protocol = args.protocol || PROTOCOL.UDP;
|
147 |
|
148 | try {
|
149 | if (protocol === PROTOCOL.TCP) {
|
150 | transport = createTcpTransport(args);
|
151 | } else if (protocol === PROTOCOL.UDS) {
|
152 | transport = createUdsTransport(args);
|
153 | } else if (protocol === PROTOCOL.UDP) {
|
154 | transport = createUdpTransport(args);
|
155 | } else if (protocol === PROTOCOL.STREAM) {
|
156 | transport = createStreamTransport(args);
|
157 | } else {
|
158 | throw new Error(`Unsupported protocol '${protocol}'`);
|
159 | }
|
160 | transport.type = protocol;
|
161 | } catch (e) {
|
162 | if (instance.errorHandler) {
|
163 | instance.errorHandler(e);
|
164 | } else {
|
165 | console.error(e);
|
166 | }
|
167 | }
|
168 |
|
169 | return transport;
|
170 | };
|