UNPKG

2.16 kBJavaScriptView Raw
1const { Transform } = require('stream');
2const MAX_BUFFER_SIZE = 1024 * 10;
3
4class TweetStreamParser extends Transform {
5 constructor(options) {
6 super({
7 ...options,
8 readableObjectMode: true
9 });
10
11 if(options) {
12 this.emitr = options.emitter;
13 this.timer = options.timer;
14 }
15 this.emitr = this.emitr || this;
16 }
17
18 parseJSON() {
19 const EOF = '\r\n';
20 const emitter = this.emitr;
21 let error, index;
22 while( (index = this.chunkBuffer.indexOf(EOF, 'utf8')) > -1 ) {
23 let chunk = this.chunkBuffer.toString('utf8', 0, index);
24 this.chunkBuffer = this.chunkBuffer.slice(index + EOF.length);
25 if(chunk.length > 0) {
26 try {
27 let json = JSON.parse(chunk);
28 if(json.data) {
29 emitter.emit('tweet', json);
30 this.push(json);
31 } else if(json.errors) {
32 emitter.emit('api-errors', json);
33 } else {
34 emitter.emit('other', json);
35 }
36 } catch(json_error) {
37 json_error.source = chunk;
38 error = json_error;
39 this.emit('stream-error', error);
40 }
41 } else {
42 emitter.emit('heartbeat');
43 }
44 }
45
46 //return error;
47 }
48
49 _transform(chunk, encoding, callback) {
50 this.encoding = encoding;
51 let op_error;
52 if(this.chunkBuffer && this.chunkBuffer.length + chunk.length < MAX_BUFFER_SIZE) {
53 const err_info = {
54 lastChunk: chunk,
55 buffer: this.chunkBuffer,
56 max_size_b: MAX_BUFFER_SIZE
57 };
58 op_error = new Error(`Stream overproducing tweet data\n${err_info}`);
59 } else if(this.chunkBuffer && chunk) {
60 this.chunkBuffer = Buffer.concat([this.chunkBuffer, chunk]);
61 } else if(chunk) {
62 this.chunkBuffer = chunk;
63 }
64 let parse_error = this.parseJSON();
65 if(this.timer) {
66 this.timer.refresh();
67 }
68 return callback(op_error || parse_error);
69 }
70
71 /**
72 * Clear the chunk buffer
73 */
74 _flush(callback) {
75 let error;
76 if(this.chunkBuffer) {
77 error = this.parseJSON();
78 this.chunkBuffer = null;
79 }
80 return callback(error);
81 }
82}
83
84module.exports = TweetStreamParser;