1 | const { Transform } = require('stream');
|
2 | const MAX_BUFFER_SIZE = 1024 * 10;
|
3 |
|
4 | class TweetStreamParser extends Transform {
|
5 | constructor(options) {
|
6 | super({
|
7 | ...options,
|
8 | readableObjectMode: true
|
9 | });
|
10 |
|
11 | if(options) {
|
12 | this.emitr = options.emitter;
|
13 | this.timer = options.timer;
|
14 | }
|
15 | this.emitr = this.emitr || this;
|
16 | }
|
17 |
|
18 | parseJSON() {
|
19 | const EOF = '\r\n';
|
20 | const emitter = this.emitr;
|
21 | let error, index;
|
22 | while( (index = this.chunkBuffer.indexOf(EOF, 'utf8')) > -1 ) {
|
23 | let chunk = this.chunkBuffer.toString('utf8', 0, index);
|
24 | this.chunkBuffer = this.chunkBuffer.slice(index + EOF.length);
|
25 | if(chunk.length > 0) {
|
26 | try {
|
27 | let json = JSON.parse(chunk);
|
28 | if(json.data) {
|
29 | emitter.emit('tweet', json);
|
30 | this.push(json);
|
31 | } else if(json.errors) {
|
32 | emitter.emit('api-errors', json);
|
33 | } else {
|
34 | emitter.emit('other', json);
|
35 | }
|
36 | } catch(json_error) {
|
37 | json_error.source = chunk;
|
38 | error = json_error;
|
39 | this.emit('stream-error', error);
|
40 | }
|
41 | } else {
|
42 | emitter.emit('heartbeat');
|
43 | }
|
44 | }
|
45 |
|
46 |
|
47 | }
|
48 |
|
49 | _transform(chunk, encoding, callback) {
|
50 | this.encoding = encoding;
|
51 | let op_error;
|
52 | if(this.chunkBuffer && this.chunkBuffer.length + chunk.length < MAX_BUFFER_SIZE) {
|
53 | const err_info = {
|
54 | lastChunk: chunk,
|
55 | buffer: this.chunkBuffer,
|
56 | max_size_b: MAX_BUFFER_SIZE
|
57 | };
|
58 | op_error = new Error(`Stream overproducing tweet data\n${err_info}`);
|
59 | } else if(this.chunkBuffer && chunk) {
|
60 | this.chunkBuffer = Buffer.concat([this.chunkBuffer, chunk]);
|
61 | } else if(chunk) {
|
62 | this.chunkBuffer = chunk;
|
63 | }
|
64 | let parse_error = this.parseJSON();
|
65 | if(this.timer) {
|
66 | this.timer.refresh();
|
67 | }
|
68 | return callback(op_error || parse_error);
|
69 | }
|
70 |
|
71 | |
72 |
|
73 |
|
74 | _flush(callback) {
|
75 | let error;
|
76 | if(this.chunkBuffer) {
|
77 | error = this.parseJSON();
|
78 | this.chunkBuffer = null;
|
79 | }
|
80 | return callback(error);
|
81 | }
|
82 | }
|
83 |
|
84 | module.exports = TweetStreamParser;
|