1 | 'use strict';
|
2 |
|
3 | const debug = require('debug')('krh:route');
|
4 | const route = require('koa-router');
|
5 |
|
6 | const util = require('./util');
|
7 | const SqlParser = require('../core/sql/sqlParser');
|
8 | const knexHelper = require('../core/knex-helper');
|
9 | const ObjStream = require('objstream')
|
10 |
|
11 |
|
12 |
|
13 |
|
14 |
|
15 |
|
16 |
|
17 |
|
18 | let router = new route();
|
19 | let sqlParser = new SqlParser();
|
20 |
|
21 | let mappingExecutor = function* (next) {
|
22 |
|
23 | let possibleConnectionId = yield this.knexHelper.findConnectionByDbkey(this.params.sqlKey);
|
24 |
|
25 | if (possibleConnectionId) {
|
26 | this.params.sqlKey = possibleConnectionId;
|
27 | }
|
28 |
|
29 |
|
30 | if (!this.privilegeHelper.canExecuteSQLMapping(this.state.database, this.state.schema, this.params.sqlKey)) {
|
31 | this.throw(403, '无权限')
|
32 | return;
|
33 | }
|
34 |
|
35 | let sqlMappings = yield this.knexHelper.getMetaConnection().select().from('sql_mapping')
|
36 | .where({ 'sql_key': this.params.sqlKey });
|
37 |
|
38 | debug('SQL IS', sqlMappings);
|
39 |
|
40 | let mappingObject = sqlMappings[0];
|
41 |
|
42 |
|
43 | if (!mappingObject) {
|
44 | this.status = 404;
|
45 | } else {
|
46 |
|
47 | let accessTime = new Date();
|
48 | let source = this.headers['Datahub-Source'] || this.origin || 'unknown';
|
49 |
|
50 |
|
51 | let params = {};
|
52 |
|
53 | if (JSON.stringify(this.query) !== '{}') {
|
54 | params = this.query;
|
55 | } else if (JSON.stringify(this.request.body) !== '{}') {
|
56 | params = this.request.body;
|
57 | }
|
58 |
|
59 | let logId = yield this.knexHelper.logSqlAccessing(
|
60 | this.ip,
|
61 | accessTime,
|
62 | mappingObject.id,
|
63 | mappingObject.sql,
|
64 | JSON.stringify(params),
|
65 | source);
|
66 |
|
67 | this.state.mappingSql = sqlParser.cleanAndReplaceParams(
|
68 | sqlParser.formatSql(mappingObject.sql),
|
69 | params,
|
70 | this.state.schema
|
71 | );
|
72 |
|
73 |
|
74 | yield next;
|
75 |
|
76 | let costTime = new Date().getTime() - accessTime.getTime();
|
77 |
|
78 | yield this.knexHelper.updateCostTime(logId, costTime);
|
79 |
|
80 |
|
81 | }
|
82 | };
|
83 |
|
84 | router
|
85 | .param('schema', function* (schema, next) {
|
86 | if (this.state.schema) {
|
87 | return yield next;
|
88 | }
|
89 |
|
90 | debug('schema param', schema);
|
91 |
|
92 | if (!schema) {
|
93 | this.body = '缺少数据库schema';
|
94 | return this.status = 400;
|
95 | }
|
96 |
|
97 |
|
98 | if (!this.privilegeHelper.canAccessSchema(this.state.database, schema)) {
|
99 | this.throw(403, '无权限')
|
100 | return;
|
101 | }
|
102 |
|
103 | this.state.schema = schema;
|
104 | yield next;
|
105 |
|
106 | })
|
107 |
|
108 | router.post('/sqlMapping', function* () {
|
109 | let dbConfig = yield this.knexHelper.getMetaConnection().insert(this.request.body).into('sql_mapping');
|
110 | this.body = dbConfig;
|
111 | });
|
112 |
|
113 |
|
114 |
|
115 |
|
116 |
|
117 | router.get('/:sqlKey',
|
118 | mappingExecutor,
|
119 | knexHelper.readOnlyTx.bind(this)(),
|
120 | function* () {
|
121 | yield this.state.connection.raw('use :schema:', { schema: this.state.schema });
|
122 |
|
123 | let sql = sqlParser.formatSql(this.state.mappingSql).replace(/\\\\:/g, ":");
|
124 |
|
125 | for (let i in this.query) {
|
126 | try {
|
127 | this.query[i] = JSON.parse(this.query[i]);
|
128 | } catch (e) {
|
129 | this.query[i] = this.query[i];
|
130 | }
|
131 | }
|
132 |
|
133 | try {
|
134 | let result = this.state.connection
|
135 | .raw(sql, this.query).stream().pipe(new ObjStream());
|
136 |
|
137 | this.body = result;
|
138 | } catch (err) {
|
139 | this.status = 500;
|
140 | this.body = err
|
141 | }
|
142 | });
|
143 |
|
144 | router.get('/:sqlKey/pagination(.*)',
|
145 | mappingExecutor,
|
146 | knexHelper.readOnlyTx.bind(this)(),
|
147 | function* () {
|
148 | yield this.state.connection.raw('use :schema:', { schema: this.state.schema });
|
149 |
|
150 | let sql = sqlParser.formatSql(this.state.mappingSql).replace(/\\\\:/g, ":");
|
151 | try {
|
152 | let data = yield this.knexHelper.paginate(sql,
|
153 | this.query,
|
154 | this.state.connection);
|
155 |
|
156 | this.body = util.wrapPagingResult(data.result, data.options);
|
157 | } catch (err) {
|
158 | this.status = 500;
|
159 | this.body = err
|
160 | }
|
161 |
|
162 | });
|
163 |
|
164 |
|
165 | router.post('/:sqlKey', mappingExecutor,
|
166 | function* () {
|
167 | yield this.state.connection.raw('use :schema:', { schema: this.state.schema });
|
168 |
|
169 | let sql = sqlParser.formatSql(this.state.mappingSql).replace(/\\\\:/g, ":");
|
170 |
|
171 | for (let i in this.request.body) {
|
172 | try {
|
173 | this.request.body[i] = JSON.parse(this.request.body[i]);
|
174 | } catch (e) {
|
175 | this.request.body[i] = this.request.body[i];
|
176 | }
|
177 | }
|
178 |
|
179 | try {
|
180 | let result = this.state.connection
|
181 | .raw(sql, this.request.body).stream().pipe(new ObjStream());
|
182 |
|
183 | this.body = result;
|
184 | } catch (err) {
|
185 | this.status = 500;
|
186 | this.body = err
|
187 | }
|
188 | });
|
189 |
|
190 | module.exports = router; |
\ | No newline at end of file |