UNPKG

6.7 kBJavaScriptView Raw
1
2
3var spawn = require("child_process").spawn
4, opened = {}
5, defaults = {
6 bin: "sqlite3",
7 detached: true
8}
9, escapeRe = /'/g
10, unescapeRe = /''/g
11
12module.exports = openDb
13
14function openDb(file, opts) {
15 return opened[file] || new Db(file, opts)
16}
17
18function nop() {}
19
20function Db(file, _opts) {
21 var db = this
22 , opts = Object.assign({}, defaults, _opts)
23 , _col = 0, _len = 0, _type = 0
24 , _row = {}
25 , args = [opts.bin, "-header", file || ""]
26 , bufs = []
27
28 if (file && file !== ":memory:") {
29 opened[db.file = file] = db
30 }
31
32 if (opts.nice) args.unshift("nice", "-n", opts.nice)
33
34 db.queue = []
35 db.headers = db.pending = false
36
37 db.child = spawn(args.shift(), args, opts)
38 ;(
39 opts.pipe ?
40 db.child.stdout.pipe(opts.pipe) :
41 db.child.stdout
42 ).on("data", function(buf) {
43 var code
44 , cut = 0
45 , i = 0
46 , len = buf.length
47 , type = _type
48 , col = _col
49 , row = _row
50
51 if (db.headers === false) {
52 if (buf[0] === 89) {
53 // no response, wait stderr before calling callback
54 return setImmediate(_done)
55 }
56 if (buf[0] === 10 && buf.length === 1) return
57 db.firstRow = row
58 i = cut = buf.indexOf(10) + 1
59 db.headers = buf.toString("utf8", 1, i - 2).split("','")
60 } else if (type === 7) {
61 type = 6
62 i = 1
63 if (buf[0] === 10 || buf[0] === 44) {
64 read(buf, i)
65 }
66 }
67
68 for (; i < len; ) {
69 if (type > 4 && (
70 buf[i++] !== 39 ||
71 buf[i] === 39 && (type = 6) && ++i ||
72 i === len && (type = 7)
73 )) continue
74 code = buf[i++]
75 if (type === 0) {
76 if (code === 89) return _done() // Y
77 type = (
78 code === 39 ? 5 : // '
79 code === 88 ? 4 : // X
80 code === 78 ? (i+=3, 1) : 2 // NULL : numbers
81 )
82 } else if (code === 10 || code === 44) { // \n || ,
83 read(buf, i)
84 }
85 }
86 _col = col
87 _row = row
88 _type = type
89 if (cut === len) return
90 if (bufs.push(buf) === 1) {
91 if (cut > 0) bufs[0] = bufs[0].slice(cut)
92 _len = bufs[0].length
93 } else {
94 _len += len
95 }
96 function read(buf, i) {
97 var j = i
98 if (bufs.length > 0) {
99 bufs.push(buf.slice(0, j))
100 buf = Buffer.concat(bufs, j += _len)
101 _len = _type = bufs.length = 0
102 }
103 row[db.headers[col]] = (
104 type === 1 ? null :
105 type === 2 ? 1 * buf.toString("utf8", cut, j-1) :
106 type === 4 ? (
107 cut + 6 === j ? buf[cut + 3] === 49 :
108 Buffer.from(buf.toString("utf8", cut+2, j-2), "hex")
109 ) :
110 type > 5 ? buf.toString("utf8", cut+1, j-2).replace(unescapeRe, "'") :
111 buf.toString("utf8", cut+1, j-2)
112 )
113 if (code === 10) {
114 if (db.onRow !== null) db.onRow.call(db, row)
115 row = {}
116 col = 0
117 } else {
118 col++
119 }
120 cut = i
121 type = 0
122 }
123 })
124 .on("end", _done)
125
126 db.child.stderr.on("data", function(buf) {
127 db.error = buf.toString("utf8", 0, buf.length - 1)
128 })
129
130 db.run(".mode quote", function(err) {
131 if (err) throw Error(err)
132 })
133
134 function _done() {
135 _row = {}
136 _type = _col = 0
137 db.headers = db.pending = false
138 if (db.onDone !== null) db.onDone.call(db, db.error)
139 if (db.queue.length > 0 && db.pending === false) {
140 db.each.apply(db, db.queue.shift())
141 }
142 }
143}
144
145Db.prototype = {
146 _add: function(query, values, onDone, onRow) {
147 var db = this
148 if (db.pending === true) {
149 db.queue.unshift([query, values, onRow, onDone])
150 } else {
151 db.each(query, values, onRow, onDone)
152 }
153 },
154 each: function(query, values, onRow, onDone) {
155 var db = this
156
157 if (Array.isArray(values)) {
158 query = query.split("?")
159 for (var i = 0, len = values.length; i < len; i++) {
160 query[i] += (
161 typeof values[i] !== "string" ? (
162 values[i] === true ? "X'01'" :
163 values[i] === false ? "X'00'" :
164 values[i] == null ? "null" :
165 Buffer.isBuffer(values[i]) ? "X'" + values[i].toString("hex") + "'" :
166 values[i]
167 ) :
168 "'" + values[i].replace(escapeRe, "''").replace(/\0/g, "") + "'"
169 )
170 }
171 query = query.join("")
172 } else if (typeof values === "function") {
173 onDone = onRow
174 onRow = values
175 }
176 if (db.pending === true) {
177 db.queue.push([query, onRow, onDone])
178 } else {
179 db.pending = true
180 db.error = null
181 db.onRow = typeof onRow === "function" ? onRow : null
182 db.onDone = typeof onDone === "function" ? onDone : null
183 db.child.stdin.write(
184 query.charCodeAt(0) !== 46 && query.charCodeAt(query.length-1) !== 59 ? query + ";\n.print Y\n" :
185 query + "\n.print Y\n"
186 )
187 }
188 },
189 run: function(query, values, onDone) {
190 if (typeof values === "function") {
191 onDone = values
192 values = null
193 }
194 return this.each(query, values, nop, onDone)
195 },
196 all: function(query, values, onDone) {
197 if (typeof values === "function") {
198 onDone = values
199 values = null
200 }
201 var rows = []
202 this.each(query, values, rows.push.bind(rows), function(err) {
203 onDone.call(this, err, rows)
204 })
205 },
206 get: function(query, values, onDone) {
207 if (typeof values === "function") {
208 onDone = values
209 values = null
210 }
211 this.each(query, values, nop, function(err) {
212 onDone.call(this, err, this.firstRow)
213 })
214 },
215 close: function(onDone) {
216 opened[this.file] = null
217 this.each(".quit", nop, onDone)
218 }
219}
220
221openDb.migrate = function migrate(db, dir) {
222 var fs = require("fs")
223 , path = require("./path")
224 , log = require("./log")("db", true)
225
226 db.get("PRAGMA user_version", function(err, res) {
227 if (err) return log.error(err)
228 var patch = ""
229 , current = res.user_version
230 , files = fs.readdirSync(dir).filter(isSql).sort()
231 , latest = parseInt(files[files.length - 1], 10)
232
233 log.info("dir:%s current:%i latest:%i", dir, current, latest)
234
235 function saveVersion(err) {
236 if (err) throw Error(err)
237 db._add("INSERT INTO db_schema_log(ver) VALUES (?)", [current], applyPatch)
238 db._add("PRAGMA user_version=?", [current], function(err) {
239 if (err) throw Error(err)
240 log.info("Migrated to", latest)
241 })
242 }
243
244 function applyPatch(err) {
245 if (err) throw Error(err)
246 for (var ver, f, i = 0; f = files[i++]; ) {
247 ver = parseInt(f, 10)
248 if (ver > current) {
249 current = ver
250 log.info("Applying migration: %s", f)
251 f = fs.readFileSync(path.resolve(dir, f), "utf8").trim().split(/\s*^-- Down$\s*/m)
252 db._add(
253 "REPLACE INTO db_schema(ver,up,down) VALUES(?,?,?)",
254 [ver, f[0], f[1]],
255 saveVersion
256 )
257 db._add(f[0])
258 }
259 }
260 }
261
262 if (latest > current) {
263 applyPatch()
264 } else if (latest < current) {
265 var rows = []
266 db._add(
267 "SELECT down FROM db_schema WHERE id>? ORDER BY id DESC",
268 [current],
269 function(err) {
270 if (err) throw Error(err)
271 var patch = rows.map(r=>r.rollback).join("\n")
272 db._add(patch, null, saveVersion)
273 },
274 rows.push.bind(rows)
275 )
276 }
277 })
278
279 function isSql(name) {
280 return name.split(".").pop()==="sql"
281 }
282}
283
284