UNPKG

20.6 kBPlain TextView Raw
1/**
2 * 研究阿里云的OTS表格设计将session、cache、user三种缓存信息算法构建在此之上。
3 */
4const TableStore = require('tablestore')
5const Long = TableStore.Long
6const _ = require('lodash')
7
8abstract class NoSqlInterface {
9 constructor(table: string, config: string | {}[] = 'default') {
10 }
11
12 // 约定所有表都需要有自增主键id作为内部唯一标识码,其余三个主键必须全部为字符串。(后端需要扩展int和date基本数据类型以及操作符重载实现优化开发)
13 // 属性字段可以根据业务演变任意的扩展增加由业务代码对于老数据不存在新增字段值的情况做兼容处理
14 public abstract async insert(id, kvt: { [index: string]: string | { value: string, timestamp: number } }) ;
15
16 // 条件更新必须要填写主键记录值(仅仅允许更新属性字段而主键字段是不允许更新的,
17 // 整列更新和删除列属于运维操作禁止应用中使用需要单独接口以及权限认证)
18 public abstract async update(id: string, kvt: { [index: string]: string | null | number }) ;
19
20 // 以主键作为条件删除记录值
21 public abstract async delete(id: string);
22
23 // 分页查询一次检索N条记录
24 public abstract async query(id: string, keys: string[] | null, max_version: number): Promise<any>;
25
26 // 销毁表
27 public abstract async destroy();
28
29 // 创建表
30 public abstract async create();
31
32 // 查询当前表的描述信息
33 public abstract async describe();
34
35 // 变更表
36 public abstract async change(param: {
37 maxVersions?: number, timeoutSeconds: number,
38 reservedThroughputRead?: number, reservedThroughputWrite?: number
39 });
40
41 // 重置表
42 public abstract async reset();
43}
44
45// export async function test() {
46// const ots = new NoSqlAliyunTablestore('session_test3')
47// // let out = await ots.create(60)
48// // FIXME 首次创建表需要有一定的系统延迟时间,需要放到install过程中进行维护。
49// // let out = await ots.insert('x', {a: 'aaa', b: {value: 'bbbb', timestamp: Date.now()}})
50// // let out = await ots.update('x', {a: 'aaa2', b: 'bbb2'})
51// // let out = await ots.update('x', {a: null, b: 1529666053100})
52// // let out = await ots.delete('x')
53//
54// let out
55// // out = await ots.insert('x', {a: 'aaa', b: {value: 'bbbb', timestamp: Date.now()}})
56// // out = await ots.query('x') // 取全部的字段
57// // out = await ots.query('x', ['a']) // 取部分字段
58// // out = await ots.query('y') // 取不存在的主键
59//
60// // out = await ots.create(24*60*60,2)
61// // out = await ots.insert('z', {a: 'aaa'+Date(), b: {value: 'bbbb'+Date(), timestamp: Date.now()}})
62// // out = await ots.update('z', {a: 'aaa'+Date(), b: 'bbbb'+Date()})
63// // out = await ots.query('z', null, 2)
64// out = await ots.query('z', null, 1)
65// // 用最佳方案去实现字段
66// return out
67// }
68
69// 对于阿里云tablestore的单表功能简单封装处理(一个主键并作为分区唯一区分一个记录,仅用于缓存功能实现。
70// 封装特点:每个记录一条主键并作为分区,每个记录有无数个kv值可供配置,每个kv值都有一个时间戳超时时间在表上单独配置。)
71export class NoSqlAliyunTablestore extends NoSqlInterface {
72 private _table: string
73 private _client: any
74 private _schema: any
75
76 set table(value) {
77 this._table = value
78 }
79
80 constructor(table: string, config: string = 'default') {
81 super(table, config)
82 const instances = xconfig('plugins.nosql')
83 xassert(instances && _.isPlainObject(instances) && config in instances,
84 ERR$CONFIG, {instances})
85 this._table = table
86 this._schema = {
87 KEYS: {
88 id: 'string' // 表的唯一记录主键值同时也是分区
89 }
90 }
91
92 this._client = new TableStore.Client({
93 accessKeyId: instances[config].OTS_ACCESS_KEY_ID,
94 secretAccessKey: instances[config].OTS_SECRETE_ACCESS_KEY,
95 endpoint: instances[config].OTS_ENDPOINT,
96 instancename: instances[config].OTS_INSTANCENAME
97 })
98 }
99
100 // 约定所有表都需要有自增主键id作为内部唯一标识码,其余三个主键必须全部为字符串。(后端需要扩展int和date基本数据类型以及操作符重载实现优化开发)
101 // 属性字段可以根据业务演变任意的扩展增加由业务代码对于老数据不存在新增字段值的情况做兼容处理
102 public async insert(id: string, kvt: { [index: string]: string | { value: string, timestamp: number } }) {
103 xassert(Object.keys(kvt).length <= 128) // 规避跨行限制总计属性128个
104 const __this__ = this
105 // var currentTimeStamp = Date.now();
106 const params = {
107 tableName: this._table,
108 // 插入的时候需要确保不存在对应的数据以防止出错
109 condition: new TableStore.Condition(TableStore.RowExistenceExpectation.EXPECT_NOT_EXIST, null),
110 primaryKey: [],
111 attributeColumns: [],
112 // 按照下面数据格式进行schema定义的验证以及数据类型转换
113 // primaryKey: [{'gid': Long.fromNumber(20013)}, {'uid': Long.fromNumber(20013)}],
114 // attributeColumns: [
115 // {'col1': '表格存储'},
116 // {'col2': '2', 'timestamp': currentTimeStamp}, // 允许修改时间戳乐观锁功能实现暂不支持
117 // {'col3': 3.1},
118 // {'col4': -0.32},
119 // {'col5': Long.fromNumber(123456789)}
120 // ],
121 // primaryKey: [
122 // {'short_id': 'pk1'},
123 // {[AUTO_KEY_NAME]: TableStore.PK_AUTO_INCR}
124 // ],
125 // attributeColumns: [
126 // {'appcode': 'app1'}
127 // ],
128 returnContent: {returnType: TableStore.ReturnType.Primarykey}
129 }
130
131 // 拼接主键以及属性字段值
132 params.primaryKey = [{'id': id}]
133 for (let k in kvt) {
134 xassert(k != 'id' && k != 'timestamp') // 两个预留内部标识符不可作为属性名
135 if (_.isString(kvt[k])) {
136 params.attributeColumns.push({
137 [k]: kvt[k]
138 })
139 } else {
140 params.attributeColumns.push({
141 [k]: kvt[k]['value'],
142 timestamp: kvt[k]['timestamp']
143 })
144 }
145 }
146
147 return new Promise((resolve, reject) => {
148 try {
149 __this__._client.putRow(params, function (err, out) {
150 if (err) {
151 xthrow(new Error(err), reject, {params, out})
152 return
153 }
154 // 正常返回的数据格式
155 //{"consumed":{"capacity_unit":{"read":0,"write":1}},"row":{
156 // "primaryKey":[{"name":"short_id","value":"abcd"},{"name":"id","value":1520765502347000}],
157 // "attributes":[]},
158 // "RequestId":"00056720-cf8d-d4a8-8ae8-970a17894ce6"}
159 resolve(out)
160 })
161 } catch (err) {
162 xthrow(err, reject)
163 }
164 })
165 }
166
167 public async update(id: string, kvt: { [index: string]: string | null | number }) {
168 return await this._update_or_replace(id, kvt, false)
169 }
170
171 public async replace(id: string, kvt: { [index: string]: string | null | number }) {
172 return await this._update_or_replace(id, kvt, true)
173 }
174
175 // 条件更新必须要填写主键记录值(仅仅允许更新属性字段而主键字段是不允许更新的,
176 // 整列更新和删除列属于运维操作禁止应用中使用需要单独接口以及权限认证)
177 private async _update_or_replace(id: string, kvt: { [index: string]: string | null | number },
178 isIgnoreRowNonExist: boolean = false) {
179 xassert(Object.keys(kvt).length > 0 && Object.keys(kvt).length <= 128) // 规避跨行限制总计属性128个
180 const __this__ = this
181 const params = {
182 tableName: this._table,
183 condition: new TableStore.Condition(isIgnoreRowNonExist ?
184 TableStore.RowExistenceExpectation.IGNORE :
185 TableStore.RowExistenceExpectation.EXPECT_EXIST, null),
186 primaryKey: [{id}],
187 // updateOfAttributeColumns: [{'PUT': [{'col1': 'test6'}]}]
188 updateOfAttributeColumns: []
189 // updateOfAttributeColumns: [
190 // { 'PUT': [{ 'col4': Long.fromNumber(4) }, { 'col5': '5' }, { 'col6': Long.fromNumber(6) }] },
191 // { 'DELETE': [{ 'col1': Long.fromNumber(1496826473186) }] }, // 删除指定时间戳版本数据
192 // { 'DELETE_ALL': ['col2'] } // 删除所有版本的字段数据
193 // ]
194 }
195
196 const PUT = []
197 const DELETE = []
198 const DELETE_ALL = []
199 for (let k in kvt) {
200 // 如果变量值为null类型则表示删除对应的字段值,如果为整数表示删除指定时间戳版本,否则表示添加或更新对应字段值。
201 if (!kvt[k]) {
202 DELETE_ALL.push(k)
203 } else if (_.isInteger(kvt[k])) {
204 DELETE.push({[k]: Long.fromNumber(kvt[k])})
205 } else if (_.isString(kvt[k])) {
206 PUT.push({[k]: kvt[k]})
207 } else {
208 xassert(false, ERR$PARAM, {id, kvt})
209 }
210 }
211 if (PUT.length > 0) params.updateOfAttributeColumns.push({PUT})
212 if (DELETE.length > 0) params.updateOfAttributeColumns.push({DELETE})
213 if (DELETE_ALL.length > 0) params.updateOfAttributeColumns.push({DELETE_ALL})
214
215 return new Promise((resolve, reject) => {
216 try {
217 __this__._client.updateRow(params, function (err, data) {
218 if (err) {
219 xthrow(new Error(err), reject, {params, data})
220 return
221 }
222 resolve()
223 })
224 } catch (err) {
225 xthrow(err, reject)
226 }
227 })
228 }
229
230 // 以主键作为条件删除记录值
231 public async delete(id: string) {
232 const __this__ = this
233 const params = {
234 tableName: this._table,
235 condition: new TableStore.Condition(TableStore.RowExistenceExpectation.IGNORE, null),
236 // primaryKey: [{ 'gid': Long.fromNumber(8) }, { 'uid': Long.fromNumber(80) }]
237 primaryKey: [{id}]
238 }
239
240 return new Promise((resolve, reject) => {
241 try {
242 __this__._client.deleteRow(params, function (err, data) {
243 if (err) {
244 xthrow(new Error(err), reject, {params, data})
245 return
246 }
247 resolve()
248 })
249 } catch (err) {
250 xthrow(err, reject)
251 }
252 })
253 }
254
255 // 范围查询需要数据自动同步到opensearch进行索引同步后进行各种复杂的查询操作实现免运维系统的实现
256 // 单表逻辑条件的简单and与equal的查询,返回满足条件的第一条记录 (合并为一个查询兼容mongodb的查询扩展)
257 public async query(id: string, keys: string[] | null = null, max_version: number = 1): Promise<any> {
258
259 const __this__ = this
260 const params = {
261 tableName: this._table,
262 columnsToGet: keys,
263 // columns_to_get 获取期望的列最多128个一次获取总数,应用上应该将KEY视为分组总数。
264 // 如何规避宽表的分页限制?? FIXME 先从应用上规避限制一个应用最多不超过128个属性,通过JSON进行扩展存储。
265 primaryKey: [{id}],
266 // primaryKey: [{'gid': Long.fromNumber(20013)}, {'uid': Long.fromNumber(20013)}],
267 columnFilter: null,
268 maxVersions: max_version,
269 }
270
271 return new Promise((resolve, reject) => {
272 __this__._client.getRow(params, function (err, data) {
273 if (err) {
274 xthrow(new Error(err), reject, {params, data})
275 return
276 }
277
278 // 返回数据格式类型进行转换处理
279 // {"consumed":{"capacity_unit":{"read":1,"write":0}},
280 // "row":{"primaryKey":[{"name":"gid","value":20013},{"name":"uid","value":20013}],
281 // "attributes":[{"columnName":"col1","columnValue":"表格存储","timestamp":1520734520286},
282 // {"columnName":"col2","columnValue":"2","timestamp":1520734520064},
283 // {"columnName":"col3","columnValue":3.1,"timestamp":1520734520286},
284 // {"columnName":"col4","columnValue":-0.32,"timestamp":1520734520286},
285 // {"columnName":"col5","columnValue":123456789,"timestamp":1520734520286}]
286 // },
287 // "next_token":null,"RequestId":"00056719-e2ad-73b1-dbd8-970a19522f4b"}
288
289 // 将数据结果进行转换处理合并为一个普通对象给应用使用
290 try {
291 let out: any = {}
292 if (!data.row) {
293 return resolve(null)
294 }
295 if (data.row.primaryKey) {
296 for (let k in data.row.primaryKey) {
297 out[data.row.primaryKey[k].name] = data.row.primaryKey[k].value
298 }
299 }
300 if (data.row.attributes) {
301 for (let k in data.row.attributes) {
302 out[data.row.attributes[k].columnName] = data.row.attributes[k].columnValue
303 }
304 }
305 // xlog(data) // TODO 当存在多个版本数据的时候解析不正确,应该是多个版本的属性值字段的组合才正确。
306 resolve(_.isEmpty(out) ? null : out)
307 } catch (err) {
308 xthrow(new Error(err), reject, {params, data})
309 return
310 }
311 })
312 })
313 }
314
315 // 销毁表
316 public async destroy() {
317 try {
318 await this._destroy()
319 } catch (err) {
320 return
321 }
322 // 10秒钟等待超时销毁表正常完成
323 for (let i = 0; i < 100; i++) {
324 try {
325 let out = this.describe()
326 xlog(out)
327 } catch (err) {
328 await xsleep(100)
329 break
330 }
331 }
332 }
333
334 private async _destroy() {
335 const __this__ = this
336 const params = {
337 tableName: this._table
338 }
339 return new Promise((resolve, reject) => {
340 __this__._client.deleteTable(params, function (err, data) {
341 if (err) {
342 xthrow(new Error(err), reject, {params})
343 return
344 }
345 resolve()
346 })
347 })
348 }
349
350 // 创建表
351 public async create(timeout = -1, max_versions = 1) {
352 await this._create(timeout, max_versions)
353 // 10秒钟等待超时创建表正常完成
354 for (let i = 0; i < 100; i++) {
355 try {
356 let out = this.describe()
357 xlog(out)
358 } catch (err) {
359 await xsleep(100)
360 continue
361 }
362 break
363 }
364 }
365
366 private async _create(timeout = -1, max_versions = 1) {
367 // OTS最长超时时间为1天的兼容处理
368 if (timeout != -1 && timeout < 86400) {
369 timeout = 86400
370 }
371 const __this__ = this
372 const params = {
373 tableMeta: {
374 tableName: this._table,
375 primaryKey: [] as any[],
376 // primaryKey: [
377 // {
378 // name: 'short_id',
379 // type: 'STRING'
380 // },
381 // {
382 // name: AUTO_KEY_NAME,
383 // type: 'INTEGER',
384 // option: 'AUTO_INCREMENT',
385 // }
386 // ]
387 },
388 reservedThroughput: {
389 capacityUnit: {
390 read: 0,
391 write: 0
392 }
393 },
394 tableOptions: {
395 timeToLive: timeout,// 数据的过期时间, 单位秒, -1代表永不过期. 假如设置过期时间为一年, 即为 365 * 24 * 3600.
396 maxVersions: max_versions,// 保存的最大版本数, 设置为1即代表每列上最多保存一个版本(保存最新的版本).
397 }
398 }
399 // 自动转换schema定义为OTS的数据结构
400 for (let k in this._schema.KEYS) {
401 let obj = {
402 name: k,
403 type: _.upperCase(this._schema.KEYS[k])
404 }
405 // if (k == AUTO_KEY_NAME) {
406 // obj['option'] = 'AUTO_INCREMENT'
407 // }
408 params.tableMeta.primaryKey.push(obj)
409 }
410 // bugfix解决第一个分区键不能为自增主键的问题默认补上一个_id字段值的问题(只做提示暂不解决约定一个_id分区键强制设置)
411 // "400: \n\u0013OTSParameterInvalid\u0012*first primary key can't be AUTO_INCREMENT."
412
413 let out = await new Promise((resolve, reject) => {
414 __this__._client.createTable(params, function (err, data) {
415 if (err) {
416 xthrow(new Error(err), reject, {params, data})
417 return
418 }
419 resolve()
420 })
421 })
422
423 // FIXME 表创建后有一定的延时时间才能生效,需要维持一定的等待时间确保正常执行完成。
424 // TODO 延时算法自动完成时间戳的处理。
425 return out
426 }
427
428 // 查询当前表的描述信息
429 public async describe() {
430 const __this__ = this
431 const params = {
432 tableName: this._table
433 }
434 return new Promise((resolve, reject) => {
435 __this__._client.describeTable(params, function (err, data) {
436 if (err) {
437 xthrow(new Error(err), reject, {params})
438 return
439 }
440 resolve(data)
441 })
442 })
443 }
444
445 // 表配置信息的更新处理(时间戳以及版本号)
446 public async change(param: {
447 maxVersions?: number, timeoutSeconds: number,
448 reservedThroughputRead?: number, reservedThroughputWrite?: number
449 }) {
450 const __this__ = this
451 const params = {
452 tableName: this._table,
453 tableOptions: {
454 // 保存的最大版本数, 设置为1即代表每列上最多保存一个版本(保存最新的版本).
455 maxVersions: param.maxVersions ? param.maxVersions : 1,
456 // 数据的过期时间, 单位秒, -1代表永不过期. 假如设置过期时间为一年, 即为 365 * 24 * 3600
457 timeToLive: param.timeoutSeconds ? param.timeoutSeconds : -1,
458
459 },
460 reservedThroughput: {
461 capacityUnit: {
462 // 为了提升并发度确保预留最小读写数量的配置避免服务共享可能产生的资源竞争不稳定问题
463 read: param.reservedThroughputRead ? param.reservedThroughputRead : 0,
464 write: param.reservedThroughputWrite ? param.reservedThroughputWrite : 0,
465 }
466 },
467 }
468 return new Promise((resolve, reject) => {
469 __this__._client.updateTable(params, function (err, data) {
470 if (err) {
471 xthrow(new Error(err), reject, {params})
472 return
473 }
474 resolve(data)
475 })
476 })
477 }
478
479 // 重置表
480 public async reset() {
481 try {
482 await this.destroy()
483 } catch (err) {
484 // ignore error
485 }
486 await this.create()
487 }
488}