UNPKG

5.21 kBJavaScriptView Raw
1// Copyright (c) 2015 Uber Technologies, Inc.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a copy
4// of this software and associated documentation files (the "Software"), to deal
5// in the Software without restriction, including without limitation the rights
6// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7// copies of the Software, and to permit persons to whom the Software is
8// furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19// THE SOFTWARE.
20
21var test = require('tape');
22
23var http = require('http');
24var KafkaServer = require('./lib/kafka-rest-server.js');
25
26var Logger = require('../logger.js');
27var KafkaBackend = require('../backends/kafka.js');
28
29test('kafka logging', function (assert) {
30 var logger;
31 var server = KafkaServer(function onMessage(err, msg) {
32 assert.ifError(err, 'no unexpected server error');
33
34 assert.equal(msg.topic, 'rt-foobar');
35
36 var obj = msg.messages[0].payload;
37
38 assert.equal(obj.level, 'info');
39 assert.ok(obj.msg.indexOf('writing to kafka') !== -1);
40
41 server.close();
42 logger.destroy();
43 assert.end();
44 });
45
46 logger = Logger({
47 meta: {
48 team: 'rt',
49 project: 'foobar'
50 },
51 backends: {
52 kafka: KafkaBackend({
53 proxyHost: 'localhost',
54 proxyPort: server.port
55 })
56 }
57 });
58
59 logger.info('writing to kafka');
60});
61
62test('kafka logging with rest client', function(assert) {
63 var count = 0;
64 var restProxyPort = 10000 + Math.floor(Math.random() * 20000);
65 var restProxyServer = http.createServer(function(req, res) {
66 var url = 'localhost:' + restProxyPort;
67 var messages = {};
68 messages[url] = ['rt-foobarx'];
69 if (req.method === 'GET') {
70 res.end(JSON.stringify(messages));
71 } else if (req.method === 'POST') {
72 assert.ok(req.headers.timestamp);
73 assert.equal(req.url, '/topics/rt-foobarx');
74 var body = '';
75 req.on('data', function (data) {
76 body += data;
77 });
78 req.on('end', function () {
79 assert.ok(body.indexOf('info') !== -1);
80 assert.ok(body.indexOf('writing to kafka') !== -1);
81 });
82 count++;
83 res.end();
84
85 setTimeout(shutdown, 200);
86 }
87 }).listen(restProxyPort);
88 // allow process to exit with keep-alive sockets
89 restProxyServer.on('connection', function (socket) {
90 socket.unref();
91 });
92
93 var logger = Logger({
94 meta: {
95 team: 'rt',
96 project: 'foobarx'
97 },
98 backends: {
99 kafka: KafkaBackend({
100 proxyHost: 'localhost',
101 proxyPort: restProxyPort,
102 maxRetries: 3
103 })
104 }
105 });
106
107 setTimeout(function() {
108 // wait for rest client init.
109 logger.info('writing to kafka');
110 }, 1000);
111
112 function shutdown() {
113 logger.close(function closed(err) {
114 assert.ifError(err, 'no unexpected close error');
115 assert.ok(true, 'logger closed');
116 });
117 setTimeout(function finish() {
118 // wait for rest client to flush.
119 assert.equal(count, 1);
120 restProxyServer.close();
121 logger.destroy();
122 assert.end();
123 }, 1000);
124 }
125});
126
127test('logger -> close', function (assert) {
128 var server = KafkaServer(function onMessage(err, msg) {
129 assert.ifError(err, 'no unexpected server error');
130
131 assert.equal(msg.topic, 'rt-foobar');
132
133 var obj = msg.messages[0].payload;
134
135 assert.equal(obj.level, 'info');
136 assert.ok(obj.msg.indexOf('writing to kafka') !== -1);
137 });
138
139 var logger = Logger({
140 meta: {
141 team: 'rt',
142 project: 'foobar'
143 },
144 backends: {
145 kafka: KafkaBackend({
146 proxyHost: 'localhost',
147 proxyPort: server.port,
148 maxRetries: 3
149 })
150 }
151 });
152
153 logger.info('writing to kafka'); // warmup
154 setTimeout(runit, 100);
155
156 function runit() {
157 // for real
158 logger.info('writing to kafka');
159 logger.close(function closed(err) {
160 assert.ifError(err, 'no unexpected close error');
161 assert.ok(true, 'logger closed');
162 finish();
163 });
164 }
165
166 function finish() {
167 server.close();
168 logger.destroy();
169 assert.end();
170 }
171});