UNPKG

10.4 kBJavaScriptView Raw
1// Copyright (c) 2019, 2024, Oracle and/or its affiliates.
2
3//-----------------------------------------------------------------------------
4//
5// This software is dual-licensed to you under the Universal Permissive License
6// (UPL) 1.0 as shown at https://oss.oracle.com/licenses/upl and Apache License
7// 2.0 as shown at http://www.apache.org/licenses/LICENSE-2.0. You may choose
8// either license.
9//
10// If you elect to accept the software under the Apache License, Version 2.0,
11// the following applies:
12//
13// Licensed under the Apache License, Version 2.0 (the "License");
14// you may not use this file except in compliance with the License.
15// You may obtain a copy of the License at
16//
17// https://www.apache.org/licenses/LICENSE-2.0
18//
19// Unless required by applicable law or agreed to in writing, software
20// distributed under the License is distributed on an "AS IS" BASIS,
21// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
22// See the License for the specific language governing permissions and
23// limitations under the License.
24//
25//-----------------------------------------------------------------------------
26
27'use strict';
28
29const { Buffer } = require('buffer');
30const errors = require('./errors.js');
31const nodbUtil = require('./util.js');
32const AqDeqOptions = require('./aqDeqOptions.js');
33const AqEnqOptions = require('./aqEnqOptions.js');
34const AqMessage = require('./aqMessage.js');
35const BaseDbObject = require('./dbObject.js');
36const transformer = require('./transformer.js');
37const types = require('./types.js');
38
39class AqQueue {
40
41 //---------------------------------------------------------------------------
42 // _isPayload()
43 //
44 // Returns a boolean indicating if the value is a valid payload.
45 //---------------------------------------------------------------------------
46 _isPayload(value) {
47 return (typeof value === 'string' || Buffer.isBuffer(value) ||
48 value instanceof BaseDbObject);
49 }
50
51 //---------------------------------------------------------------------------
52 // _makeMessage()
53 //
54 // For enqOne()/deqOne()/enqMany()/deqMany(), wrap the return value with JS
55 // layer object.
56 //---------------------------------------------------------------------------
57 _makeMessage(msgImpl) {
58 const msg = new AqMessage();
59 msg._impl = msgImpl;
60 msg._payloadTypeClass = this._payloadTypeClass;
61 return msg;
62 }
63
64 //---------------------------------------------------------------------------
65 // _verifyMessage()
66 //
67 // Messages that can be enqueued must be a string, Buffer or database object
68 // (in which case all message properties are defaulted) or an object
69 // containing a "payload" property along with the other properties to use
70 // during the enqueue. A normalized object is returned.
71 //---------------------------------------------------------------------------
72 _verifyMessage(message) {
73
74 // validate we have a payload of the correct type
75 let payload;
76 if (this._isPayload(message)) {
77 payload = message;
78 message = {};
79 } else {
80 message = {...message};
81 if (this._isJson || this._isPayload(message.payload)) {
82 payload = message.payload;
83 } else if (this._payloadTypeClass) {
84 payload = new this._payloadTypeClass(message.payload);
85 } else {
86 errors.throwErr(errors.ERR_INVALID_AQ_MESSAGE);
87 }
88 }
89
90 // validate payload
91 if (this._isJson) {
92 message.payload = transformer.transformJsonValue(payload);
93 } else if (typeof payload === 'string') {
94 message.payload = Buffer.from(payload);
95 } else if (Buffer.isBuffer(payload)) {
96 message.payload = payload;
97 } else {
98 message.payload = payload._impl;
99 }
100
101 // validate options, if applicable
102 if (message.correlation !== undefined) {
103 errors.assertParamPropValue(typeof message.correlation === 'string', 1,
104 "correlation");
105 }
106 if (message.delay !== undefined) {
107 errors.assertParamPropValue(Number.isInteger(message.delay), 1, "delay");
108 }
109 if (message.exceptionQueue !== undefined) {
110 errors.assertParamPropValue(typeof message.exceptionQueue === 'string',
111 1, "exceptionQueue");
112 }
113 if (message.expiration !== undefined) {
114 errors.assertParamPropValue(Number.isInteger(message.expiration), 1,
115 "expiration");
116 }
117 if (message.priority !== undefined) {
118 errors.assertParamPropValue(Number.isInteger(message.priority), 1,
119 "priority");
120 }
121 if (message.recipients !== undefined) {
122 errors.assertParamPropValue(nodbUtil.isArrayOfStrings(message.recipients),
123 1, "recipients");
124 }
125
126 return message;
127 }
128
129 //---------------------------------------------------------------------------
130 // create()
131 //
132 // Creates the queue and populates some internal attributes.
133 //---------------------------------------------------------------------------
134 async create(conn, name, options) {
135 if (options.payloadType === types.DB_TYPE_JSON) {
136 this._isJson = true;
137 this._payloadType = types.DB_TYPE_JSON;
138 this._payloadTypeName = "JSON";
139 } else if (options.payloadType === undefined ||
140 options.payloadType === types.DB_TYPE_RAW) {
141 this._payloadType = types.DB_TYPE_RAW;
142 this._payloadTypeName = "RAW";
143 } else {
144 if (typeof options.payloadType === 'string') {
145 // DB Object type
146 const cls = await conn._getDbObjectClassForName(options.payloadType);
147 this._payloadTypeClass = cls;
148 options.payloadType = cls;
149 } else {
150 errors.assertParamPropValue(nodbUtil.isObject(options.payloadType) &&
151 options.payloadType.prototype instanceof BaseDbObject, 2, "payloadType");
152 this._payloadTypeClass = options.payloadType;
153 }
154 this._payloadType = types.DB_TYPE_OBJECT;
155 this._payloadTypeName = this._payloadTypeClass.prototype.name;
156 }
157 this._name = name;
158 this._impl = await conn._impl.getQueue(name, this._payloadTypeClass,
159 this._payloadType);
160 }
161
162 //---------------------------------------------------------------------------
163 // deqMany()
164 //
165 // Returns an array of messages from the queue, up to the maximum specified,
166 // if any are available.
167 //---------------------------------------------------------------------------
168 async deqMany(maxMessages) {
169 errors.assertArgCount(arguments, 1, 1);
170 errors.assertParamValue(Number.isInteger(maxMessages) && maxMessages > 0,
171 1);
172 const msgImpls = await this._impl.deq(maxMessages);
173 return msgImpls.map(i => this._makeMessage(i));
174 }
175
176 //---------------------------------------------------------------------------
177 // deqOne()
178 //
179 // Returns a single message from the queue, if one is available.
180 //---------------------------------------------------------------------------
181 async deqOne() {
182 errors.assertArgCount(arguments, 0, 0);
183 const msgImpls = await this._impl.deq(1);
184 if (msgImpls)
185 return this._makeMessage(msgImpls[0]);
186 }
187
188 //---------------------------------------------------------------------------
189 // deqOptions
190 //
191 // Property for the dequeue options associated with the queue.
192 //---------------------------------------------------------------------------
193 get deqOptions() {
194 if (!this._deqOptions) {
195 const deqOptions = new AqDeqOptions();
196 deqOptions._impl = this._impl.deqOptions;
197 this._deqOptions = deqOptions;
198 }
199 return this._deqOptions;
200 }
201
202 //---------------------------------------------------------------------------
203 // enqMany()
204 //
205 // Enqueues multiple messages into the queue at the same time, avoiding
206 // multiple round-trips.
207 //---------------------------------------------------------------------------
208 async enqMany(messages) {
209 errors.assertArgCount(arguments, 1, 1);
210 errors.assertParamValue(Array.isArray(messages) && messages.length > 0, 1);
211 const verifiedMessages = new Array(messages.length);
212 for (let i = 0; i < messages.length; i++) {
213 verifiedMessages[i] = this._verifyMessage(messages[i]);
214 }
215 const msgImpls = await this._impl.enq(verifiedMessages);
216 return msgImpls.map(i => this._makeMessage(i));
217 }
218
219 //---------------------------------------------------------------------------
220 // enqOne()
221 //
222 // Enqueues a single message into the queue.
223 //---------------------------------------------------------------------------
224 async enqOne(message) {
225 errors.assertArgCount(arguments, 1, 1);
226 message = this._verifyMessage(message);
227 const msgImpls = await this._impl.enq([message]);
228 return this._makeMessage(msgImpls[0]);
229 }
230
231 //---------------------------------------------------------------------------
232 // enqOptions
233 //
234 // Property for the enqueue options associated with the queue.
235 //---------------------------------------------------------------------------
236 get enqOptions() {
237 if (!this._enqOptions) {
238 const enqOptions = new AqEnqOptions();
239 enqOptions._impl = this._impl.enqOptions;
240 this._enqOptions = enqOptions;
241 }
242 return this._enqOptions;
243 }
244
245 //---------------------------------------------------------------------------
246 // name
247 //
248 // Property for the name of the queue.
249 //---------------------------------------------------------------------------
250 get name() {
251 return this._name;
252 }
253
254 //---------------------------------------------------------------------------
255 // payloadType
256 //
257 // Property for the payload type.
258 //---------------------------------------------------------------------------
259 get payloadType() {
260 return this._payloadType;
261 }
262
263 //---------------------------------------------------------------------------
264 // payloadTypeName
265 //
266 // Property for the payload type name.
267 //---------------------------------------------------------------------------
268 get payloadTypeName() {
269 return this._payloadTypeName;
270 }
271
272 //---------------------------------------------------------------------------
273 // payloadTypeClass
274 //
275 // Property for the payload type class.
276 //---------------------------------------------------------------------------
277 get payloadTypeClass() {
278 return this._payloadTypeClass;
279 }
280
281}
282
283nodbUtil.wrapFns(AqQueue.prototype,
284 "deqOne",
285 "deqMany",
286 "enqOne",
287 "enqMany");
288
289module.exports = AqQueue;