UNPKG

8.48 kBJavaScriptView Raw
1
2
3var spawn = require("child_process").spawn
4, opened = {}
5, defaults = {
6 bin: "sqlite3",
7 detached: true
8}
9, escapeRe = /'/g
10, unescapeRe = /''/g
11
12module.exports = openDb
13
14function openDb(file, opts) {
15 return opened[file] || new Db(file, opts)
16}
17openDb.migrate = migrate
18
19function nop() {}
20
21function Db(file, opts) {
22 var db = Object.assign(this, defaults, opts)
23 , _col = 0, _len = 0, _type = 0
24 , _row = {}
25 , args = [db.bin, "-header", file || ""]
26 , bufs = []
27
28 if (file && file !== ":memory:") {
29 opened[file] = db
30 }
31
32 if (db.nice) args.unshift("nice", "-n", db.nice)
33
34 db.file = file
35 db.queue = []
36 db.headers = db.pending = false
37
38 db.child = spawn(args.shift(), args, db)
39 ;(
40 db.pipe ?
41 db.child.stdout.pipe(db.pipe) :
42 db.child.stdout
43 ).on("data", function(buf) {
44 var code
45 , cut = 0
46 , i = 0
47 , len = buf.length
48 , type = _type
49 , col = _col
50 , row = _row
51
52 if (typeof db.onRow === "string") {
53 if (buf[len - 2] === 89 && buf[len - 1] === 10 && buf[len - 3] === 10) {
54 db.onRow += buf.toString("utf8", 0, len - 3)
55 setImmediate(_done)
56 } else {
57 db.onRow += buf.toString()
58 }
59 return
60 }
61
62 if (db.headers === false) {
63 if (buf[0] === 89) {
64 // no response, wait stderr before calling callback
65 return setImmediate(_done)
66 }
67 if (buf[0] === 10 && buf.length === 1) return
68 i = cut = buf.indexOf(10) + 1
69 db.headers = buf.toString("utf8", 1, i - 2).split("','")
70 } else if (type === 9) {
71 type = 8
72 i = 1
73 if (buf[0] === 10 || buf[0] === 44) {
74 read(buf, i)
75 }
76 }
77
78 for (; i < len; ) {
79 if (type > 6 && (
80 buf[i++] !== 39 ||
81 buf[i] === 39 && (type = 8) && ++i ||
82 i === len && (type = 9)
83 )) continue
84 code = buf[i++]
85 if (type === 0) {
86 if (code === 89) return setImmediate(_done) // Y
87 type = (
88 code === 39 ? 7 : // '
89 code === 88 ? 6 : // X
90 code === 99 ? 3 : // changes: 1 total_changes: 5
91 code === 82 ? 4 : // Run Time: real 0.001 user 0.000069 sys 0.000079
92 code === 78 ? (i+=3, 1) : 2 // NULL : numbers
93 )
94 } else if (code === 10 || code === 44) { // \n || ,
95 read(buf, i)
96 }
97 }
98 _col = col
99 _row = row
100 _type = type
101 if (cut === len) return
102 if (bufs.push(buf) === 1) {
103 if (cut > 0) bufs[0] = bufs[0].slice(cut)
104 _len = bufs[0].length
105 } else {
106 _len += len
107 }
108 function read(buf, i) {
109 var j = i
110 if (bufs.length > 0) {
111 bufs.push(buf.slice(0, j))
112 buf = Buffer.concat(bufs, j += _len)
113 _len = _type = bufs.length = 0
114 }
115 if (type === 3) {
116 j = buf.toString("utf8", cut, j).split(/[\:\s]+/)
117 db.changes = +j[1]
118 db.totalChanges = +j[3]
119 } else if (type === 4) {
120 j = buf.toString("utf8", cut, j).split(/[\:\s]+/)
121 db.real = +j[3]
122 db.user = +j[5]
123 db.sys = +j[7]
124 } else {
125 row[db.headers[col]] = (
126 type === 1 ? null :
127 type === 2 ? 1 * buf.toString("utf8", cut, j-1) :
128 type === 6 ? (
129 cut + 6 === j ? buf[cut + 3] === 49 :
130 Buffer.from(buf.toString("utf8", cut+2, j-2), "hex")
131 ) :
132 type > 7 ? buf.toString("utf8", cut+1, j-2).replace(unescapeRe, "'") :
133 buf.toString("utf8", cut+1, j-2)
134 )
135 if (code === 10) {
136 if (db.firstRow === null) db.firstRow = row
137 if (db.onRow !== null) db.onRow.call(db, row)
138 row = {}
139 col = 0
140 } else {
141 col++
142 }
143 }
144 cut = i
145 type = 0
146 }
147 })
148 .on("end", _done)
149
150 db.child.stderr.on("data", function(buf) {
151 db.error = buf.toString("utf8", 0, buf.length - 1)
152 })
153
154 db.run(".mode quote", function(err) {
155 if (err) throw Error(err)
156 })
157
158 if (db.migration) migrate(db, db.migration)
159
160 function _done() {
161 _row = {}
162 _type = _col = 0
163 db.headers = db.pending = false
164 if (db.onDone !== null) db.onDone.call(db, db.error)
165 else if (db.error !== null) throw Error(db.error + "\n-- " + db.lastQuery)
166 if (db.queue.length > 0 && db.pending === false) {
167 db.each.apply(db, db.queue.shift())
168 }
169 }
170}
171
172Db.prototype = {
173 // Overwriting Db.prototype will ruin constructor
174 constructor: Db,
175 _esc: function _esc(value) {
176 return typeof value !== "string" ? (
177 value === true ? "X'01'" :
178 value === false ? "X'00'" :
179 value == null || value !== value ? "null" :
180 Array.isArray(value) ? value.map(_esc).join(",") :
181 Buffer.isBuffer(value) ? "X'" + value.toString("hex") + "'" :
182 value
183 ) :
184 "'" + value.replace(escapeRe, "''").replace(/\0/g, "") + "'"
185 },
186 each: function(query, values, onRow, onDone, immediate) {
187 var db = this
188
189 if (Array.isArray(values)) {
190 query = query.split("?")
191 for (var i = values.length; i--; ) {
192 query[i] += db._esc(values[i])
193 }
194 query = query.join("")
195 } else if (typeof values === "function") {
196 onDone = onRow
197 onRow = values
198 }
199 if (db.pending === true) {
200 db.queue[immediate === true ? "unshift" : "push"]([query, null, onRow, onDone])
201 } else {
202 db.pending = true
203 db.changes = 0
204 db.real = db.user = db.sys = null
205 db.error = db.firstRow = null
206 db.onRow = typeof onRow === "function" || onRow === "" ? onRow : null
207 db.onDone = typeof onDone === "function" ? onDone : null
208 db.lastQuery = query
209 //db.child.stdin.write(query + "\n;\n.print Y\n")
210 db.child.stdin.write(
211 query.charCodeAt(0) !== 46 && query.charCodeAt(query.length-1) !== 59 ? query + ";\n.print Y\n" :
212 query + "\n.print Y\n"
213 )
214 }
215 },
216 run: function(query, values, onDone, immediate) {
217 if (typeof values === "function") {
218 this.each(query, null, nop, values, onDone)
219 } else {
220 this.each(query, values, nop, onDone, immediate)
221 }
222 },
223 all: function(query, values, onDone) {
224 if (typeof values === "function") {
225 onDone = values
226 values = null
227 }
228 var rows = []
229 this.each(query, values, rows.push.bind(rows), function(err) {
230 onDone.call(this, err, rows)
231 })
232 },
233 get: function(query, values, onDone) {
234 if (typeof values === "function") {
235 onDone = values
236 values = null
237 }
238 this.each(query, values, nop, function(err) {
239 if (typeof onDone === "function") {
240 onDone.call(this, err, this.firstRow)
241 }
242 })
243 },
244 insert: function(query, values, onDone) {
245 if (values && values.constructor === Object) {
246 query += "(" + Object.keys(values) + ")"
247 values = Object.values(values)
248 }
249 if (Array.isArray(values)) {
250 query += " VALUES (" + Array(values.length).join("?,") + "?)"
251 }
252 this.get("INSERT INTO " + query + ";SELECT last_insert_rowid() AS lastId;", values, onDone)
253 },
254 text: function(query, onDone) {
255 this.each(query, null, "", function(err) {
256 onDone.call(this, err, this.onRow)
257 })
258 },
259 close: function(onDone) {
260 opened[this.file] = null
261 this.each(".quit", nop, onDone)
262 }
263}
264
265function migrate(db, dir, _wanted) {
266 var fs = require("fs")
267 , path = require("./path")
268 , log = require("../log")("db:migrate")
269 , files = fs.readdirSync(dir).filter(isSql).sort()
270
271 db.get("PRAGMA user_version", function(err, res) {
272 if (err) return log.error(err)
273 var i = 0
274 , len = files.length
275 , current = res.user_version
276 , latest = parseInt(files[len - 1], 10)
277 , wanted = _wanted < latest ? _wanted : latest
278
279 log.info("%s current:%i latest:%i wanted:%i in:%s", db.file, current, latest, wanted, dir)
280
281 if (latest > current) {
282 for (; i < len && parseInt(files[i], 10) <= current; i++);
283 applyPatch()
284 } else if (latest < current) {
285 var rows = []
286 db.each(
287 "SELECT down FROM db_schema WHERE ver>? ORDER BY ver DESC",
288 [wanted],
289 rows.push.bind(rows),
290 function(err) {
291 if (err) throw Error(err)
292 current = wanted
293 var patch = rows.map(r=>r.down).join("\n")
294 db.run(patch, null, saveVersion, true)
295 },
296 true
297 )
298 }
299
300 function applyPatch(err) {
301 if (err) throw Error(err)
302 var f = files[i++]
303 , ver = parseInt(f, 10)
304 if (ver > current) {
305 log.info("Apply %s", f)
306 f = fs.readFileSync(path.resolve(dir, f), "utf8").trim().split(/\s*^-- Down$\s*/m)
307 current = ver
308 db.run(
309 f[0],
310 function() {
311 db.run(
312 "REPLACE INTO db_schema(ver,up,down) VALUES(?,?,?)",
313 [ver, f[0], f[1]],
314 saveVersion,
315 true
316 )
317 },
318 true
319 )
320 }
321 }
322
323 function saveVersion(err) {
324 if (err) throw Error(err)
325 db.run("PRAGMA user_version=?", [current], function(err) {
326 if (err) throw Error(err)
327 log.info("Migrated to", current)
328 db.run("INSERT INTO db_schema_log(ver) VALUES (?)", [current], applyPatch, true)
329 }, true)
330 }
331 })
332
333 function isSql(name) {
334 return name.split(".").pop() === "sql"
335 }
336}
337
338