1 | node_url = require('url')
|
2 | _ = require 'underscore'
|
3 | _.str = require 'underscore.string'
|
4 | _.mixin(_.str.exports())
|
5 | {Readable} = require 'stream'
|
6 |
|
7 |
|
8 |
|
9 |
|
10 |
|
11 | class QueryStream extends Readable
|
12 | constructor: (@_query) ->
|
13 | super { objectMode: true }
|
14 | @running = false
|
15 |
|
16 | _read: () => @run()
|
17 |
|
18 | run: () =>
|
19 | return if @running
|
20 | @running = true
|
21 | @_query.exec (err, obj) =>
|
22 | return @emit('error', err) if err?
|
23 | return @emit('error', "Cannot stream object #{JSON.stringify(obj, null, 2)}") unless obj?.kind?
|
24 | resource = _.strRightBack(obj.kind, '#')
|
25 | if _(obj?[resource]).isArray()
|
26 | @push r for r in obj[resource]
|
27 | else if obj?
|
28 | @push obj
|
29 | if not obj.nextPageToken
|
30 | @push null
|
31 | return
|
32 | if obj.nextPageToken
|
33 | parsed = node_url.parse(@_query._opts.uri)
|
34 | if parsed.search.indexOf('pageToken') != -1
|
35 | new_search = "?pageToken=#{obj.nextPageToken}&#{_(parsed.search).strRight('&')}"
|
36 | else
|
37 | new_search = "?pageToken=#{obj.nextPageToken}&#{parsed.search.slice(1)}"
|
38 | @_query._opts.uri = "#{parsed.protocol}//#{parsed.host}#{parsed.pathname}#{new_search}"
|
39 | @running = false
|
40 | process.nextTick @run
|
41 |
|
42 | class Query
|
43 | constructor: (@_google_api, @_opts) ->
|
44 | exec: (cb) =>
|
45 | @_google_api.oauth2_request @_opts, @_google_api.constructor.response_handler(cb)
|
46 | stream: () => new QueryStream @
|
47 |
|
48 | module.exports =
|
49 | Query: Query
|
50 | QueryStream: QueryStream
|