UNPKG

2.1 kBJavaScriptView Raw
1const { Transform } = require('stream');
2const MAX_BUFFER_SIZE = 1024 * 1024 * 50;
3
4class TweetStreamParser extends Transform {
5 constructor(options) {
6 super({
7 ...options,
8 readableObjectMode: true
9 });
10
11 if(options) {
12 this.emitr = options.emitter;
13 this.timer = options.timer;
14 }
15 this.emitr = this.emitr || this;
16 }
17
18 parseJSON() {
19 const EOF = '\r\n';
20 const emitter = this.emitr;
21 let error, index;
22 while( (index = this.chunkBuffer.indexOf(EOF, 'utf8')) > -1 ) {
23 let chunk = this.chunkBuffer.toString('utf8', 0, index);
24 this.chunkBuffer = this.chunkBuffer.slice(index + EOF.length);
25
26 if(chunk.length > 0) {
27 try {
28 let json = JSON.parse(chunk);
29 if(json.data) {
30 emitter.emit('tweet', json);
31 this.push(json);
32 } else if(json.errors) {
33 emitter.emit('api-errors', json);
34 } else {
35 emitter.emit('other', json);
36 }
37 } catch(json_error) {
38 json_error.source = chunk;
39 error = json_error;
40 this.emit('stream-error', error);
41 }
42 } else {
43 emitter.emit('heartbeat');
44 this.push();
45 }
46 }
47 }
48
49 _transform(chunk, encoding, callback) {
50 this.encoding = encoding;
51 let op_error;
52 if(this.chunkBuffer && this.chunkBuffer.length > MAX_BUFFER_SIZE) {
53 const err_info = {
54 lastChunk: chunk,
55 buffer: this.chunkBuffer,
56 max_size_b: MAX_BUFFER_SIZE
57 };
58 op_error = new Error(`Stream overproducing tweet data\n${err_info}`);
59 } else if(this.chunkBuffer && chunk) {
60 this.chunkBuffer = Buffer.concat([this.chunkBuffer, chunk]);
61 } else if(chunk) {
62 this.chunkBuffer = chunk;
63 }
64 this.parseJSON();
65 if(this.timer) {
66 this.timer.refresh();
67 }
68 callback(op_error);
69 }
70
71 /**
72 * Clear the chunk buffer
73 */
74 _flush(callback) {
75 let error;
76 if(this.chunkBuffer) {
77 this.parseJSON();
78 this.chunkBuffer = null;
79 }
80 callback(error);
81 }
82}
83
84module.exports = TweetStreamParser;