1 | // Copyright (c) 2019, 2024, Oracle and/or its affiliates.
|
2 |
|
3 | //-----------------------------------------------------------------------------
|
4 | //
|
5 | // This software is dual-licensed to you under the Universal Permissive License
|
6 | // (UPL) 1.0 as shown at https://oss.oracle.com/licenses/upl and Apache License
|
7 | // 2.0 as shown at http://www.apache.org/licenses/LICENSE-2.0. You may choose
|
8 | // either license.
|
9 | //
|
10 | // If you elect to accept the software under the Apache License, Version 2.0,
|
11 | // the following applies:
|
12 | //
|
13 | // Licensed under the Apache License, Version 2.0 (the "License");
|
14 | // you may not use this file except in compliance with the License.
|
15 | // You may obtain a copy of the License at
|
16 | //
|
17 | // https://www.apache.org/licenses/LICENSE-2.0
|
18 | //
|
19 | // Unless required by applicable law or agreed to in writing, software
|
20 | // distributed under the License is distributed on an "AS IS" BASIS,
|
21 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
22 | // See the License for the specific language governing permissions and
|
23 | // limitations under the License.
|
24 | //
|
25 | //-----------------------------------------------------------------------------
|
26 |
|
27 | ;
|
28 |
|
29 | const { Buffer } = require('buffer');
|
30 | const errors = require('./errors.js');
|
31 | const nodbUtil = require('./util.js');
|
32 | const AqDeqOptions = require('./aqDeqOptions.js');
|
33 | const AqEnqOptions = require('./aqEnqOptions.js');
|
34 | const AqMessage = require('./aqMessage.js');
|
35 | const BaseDbObject = require('./dbObject.js');
|
36 | const transformer = require('./transformer.js');
|
37 | const types = require('./types.js');
|
38 |
|
39 | class AqQueue {
|
40 |
|
41 | //---------------------------------------------------------------------------
|
42 | // _isPayload()
|
43 | //
|
44 | // Returns a boolean indicating if the value is a valid payload.
|
45 | //---------------------------------------------------------------------------
|
46 | _isPayload(value) {
|
47 | return (typeof value === 'string' || Buffer.isBuffer(value) ||
|
48 | value instanceof BaseDbObject);
|
49 | }
|
50 |
|
51 | //---------------------------------------------------------------------------
|
52 | // _makeMessage()
|
53 | //
|
54 | // For enqOne()/deqOne()/enqMany()/deqMany(), wrap the return value with JS
|
55 | // layer object.
|
56 | //---------------------------------------------------------------------------
|
57 | _makeMessage(msgImpl) {
|
58 | const msg = new AqMessage();
|
59 | msg._impl = msgImpl;
|
60 | msg._payloadTypeClass = this._payloadTypeClass;
|
61 | return msg;
|
62 | }
|
63 |
|
64 | //---------------------------------------------------------------------------
|
65 | // _verifyMessage()
|
66 | //
|
67 | // Messages that can be enqueued must be a string, Buffer or database object
|
68 | // (in which case all message properties are defaulted) or an object
|
69 | // containing a "payload" property along with the other properties to use
|
70 | // during the enqueue. A normalized object is returned.
|
71 | //---------------------------------------------------------------------------
|
72 | _verifyMessage(message) {
|
73 |
|
74 | // validate we have a payload of the correct type
|
75 | let payload;
|
76 | if (this._isPayload(message)) {
|
77 | payload = message;
|
78 | message = {};
|
79 | } else {
|
80 | message = {...message};
|
81 | if (this._isJson || this._isPayload(message.payload)) {
|
82 | payload = message.payload;
|
83 | } else if (this._payloadTypeClass) {
|
84 | payload = new this._payloadTypeClass(message.payload);
|
85 | } else {
|
86 | errors.throwErr(errors.ERR_INVALID_AQ_MESSAGE);
|
87 | }
|
88 | }
|
89 |
|
90 | // validate payload
|
91 | if (this._isJson) {
|
92 | message.payload = transformer.transformJsonValue(payload);
|
93 | } else if (typeof payload === 'string') {
|
94 | message.payload = Buffer.from(payload);
|
95 | } else if (Buffer.isBuffer(payload)) {
|
96 | message.payload = payload;
|
97 | } else {
|
98 | message.payload = payload._impl;
|
99 | }
|
100 |
|
101 | // validate options, if applicable
|
102 | if (message.correlation !== undefined) {
|
103 | errors.assertParamPropValue(typeof message.correlation === 'string', 1,
|
104 | "correlation");
|
105 | }
|
106 | if (message.delay !== undefined) {
|
107 | errors.assertParamPropValue(Number.isInteger(message.delay), 1, "delay");
|
108 | }
|
109 | if (message.exceptionQueue !== undefined) {
|
110 | errors.assertParamPropValue(typeof message.exceptionQueue === 'string',
|
111 | 1, "exceptionQueue");
|
112 | }
|
113 | if (message.expiration !== undefined) {
|
114 | errors.assertParamPropValue(Number.isInteger(message.expiration), 1,
|
115 | "expiration");
|
116 | }
|
117 | if (message.priority !== undefined) {
|
118 | errors.assertParamPropValue(Number.isInteger(message.priority), 1,
|
119 | "priority");
|
120 | }
|
121 | if (message.recipients !== undefined) {
|
122 | errors.assertParamPropValue(nodbUtil.isArrayOfStrings(message.recipients),
|
123 | 1, "recipients");
|
124 | }
|
125 |
|
126 | return message;
|
127 | }
|
128 |
|
129 | //---------------------------------------------------------------------------
|
130 | // create()
|
131 | //
|
132 | // Creates the queue and populates some internal attributes.
|
133 | //---------------------------------------------------------------------------
|
134 | async create(conn, name, options) {
|
135 | if (options.payloadType === types.DB_TYPE_JSON) {
|
136 | this._isJson = true;
|
137 | this._payloadType = types.DB_TYPE_JSON;
|
138 | this._payloadTypeName = "JSON";
|
139 | } else if (options.payloadType === undefined ||
|
140 | options.payloadType === types.DB_TYPE_RAW) {
|
141 | this._payloadType = types.DB_TYPE_RAW;
|
142 | this._payloadTypeName = "RAW";
|
143 | } else {
|
144 | if (typeof options.payloadType === 'string') {
|
145 | // DB Object type
|
146 | const cls = await conn._getDbObjectClassForName(options.payloadType);
|
147 | this._payloadTypeClass = cls;
|
148 | options.payloadType = cls;
|
149 | } else {
|
150 | errors.assertParamPropValue(nodbUtil.isObject(options.payloadType) &&
|
151 | options.payloadType.prototype instanceof BaseDbObject, 2, "payloadType");
|
152 | this._payloadTypeClass = options.payloadType;
|
153 | }
|
154 | this._payloadType = types.DB_TYPE_OBJECT;
|
155 | this._payloadTypeName = this._payloadTypeClass.prototype.name;
|
156 | }
|
157 | this._name = name;
|
158 | this._impl = await conn._impl.getQueue(name, this._payloadTypeClass,
|
159 | this._payloadType);
|
160 | }
|
161 |
|
162 | //---------------------------------------------------------------------------
|
163 | // deqMany()
|
164 | //
|
165 | // Returns an array of messages from the queue, up to the maximum specified,
|
166 | // if any are available.
|
167 | //---------------------------------------------------------------------------
|
168 | async deqMany(maxMessages) {
|
169 | errors.assertArgCount(arguments, 1, 1);
|
170 | errors.assertParamValue(Number.isInteger(maxMessages) && maxMessages > 0,
|
171 | 1);
|
172 | const msgImpls = await this._impl.deq(maxMessages);
|
173 | return msgImpls.map(i => this._makeMessage(i));
|
174 | }
|
175 |
|
176 | //---------------------------------------------------------------------------
|
177 | // deqOne()
|
178 | //
|
179 | // Returns a single message from the queue, if one is available.
|
180 | //---------------------------------------------------------------------------
|
181 | async deqOne() {
|
182 | errors.assertArgCount(arguments, 0, 0);
|
183 | const msgImpls = await this._impl.deq(1);
|
184 | if (msgImpls)
|
185 | return this._makeMessage(msgImpls[0]);
|
186 | }
|
187 |
|
188 | //---------------------------------------------------------------------------
|
189 | // deqOptions
|
190 | //
|
191 | // Property for the dequeue options associated with the queue.
|
192 | //---------------------------------------------------------------------------
|
193 | get deqOptions() {
|
194 | if (!this._deqOptions) {
|
195 | const deqOptions = new AqDeqOptions();
|
196 | deqOptions._impl = this._impl.deqOptions;
|
197 | this._deqOptions = deqOptions;
|
198 | }
|
199 | return this._deqOptions;
|
200 | }
|
201 |
|
202 | //---------------------------------------------------------------------------
|
203 | // enqMany()
|
204 | //
|
205 | // Enqueues multiple messages into the queue at the same time, avoiding
|
206 | // multiple round-trips.
|
207 | //---------------------------------------------------------------------------
|
208 | async enqMany(messages) {
|
209 | errors.assertArgCount(arguments, 1, 1);
|
210 | errors.assertParamValue(Array.isArray(messages) && messages.length > 0, 1);
|
211 | const verifiedMessages = new Array(messages.length);
|
212 | for (let i = 0; i < messages.length; i++) {
|
213 | verifiedMessages[i] = this._verifyMessage(messages[i]);
|
214 | }
|
215 | const msgImpls = await this._impl.enq(verifiedMessages);
|
216 | return msgImpls.map(i => this._makeMessage(i));
|
217 | }
|
218 |
|
219 | //---------------------------------------------------------------------------
|
220 | // enqOne()
|
221 | //
|
222 | // Enqueues a single message into the queue.
|
223 | //---------------------------------------------------------------------------
|
224 | async enqOne(message) {
|
225 | errors.assertArgCount(arguments, 1, 1);
|
226 | message = this._verifyMessage(message);
|
227 | const msgImpls = await this._impl.enq([message]);
|
228 | return this._makeMessage(msgImpls[0]);
|
229 | }
|
230 |
|
231 | //---------------------------------------------------------------------------
|
232 | // enqOptions
|
233 | //
|
234 | // Property for the enqueue options associated with the queue.
|
235 | //---------------------------------------------------------------------------
|
236 | get enqOptions() {
|
237 | if (!this._enqOptions) {
|
238 | const enqOptions = new AqEnqOptions();
|
239 | enqOptions._impl = this._impl.enqOptions;
|
240 | this._enqOptions = enqOptions;
|
241 | }
|
242 | return this._enqOptions;
|
243 | }
|
244 |
|
245 | //---------------------------------------------------------------------------
|
246 | // name
|
247 | //
|
248 | // Property for the name of the queue.
|
249 | //---------------------------------------------------------------------------
|
250 | get name() {
|
251 | return this._name;
|
252 | }
|
253 |
|
254 | //---------------------------------------------------------------------------
|
255 | // payloadType
|
256 | //
|
257 | // Property for the payload type.
|
258 | //---------------------------------------------------------------------------
|
259 | get payloadType() {
|
260 | return this._payloadType;
|
261 | }
|
262 |
|
263 | //---------------------------------------------------------------------------
|
264 | // payloadTypeName
|
265 | //
|
266 | // Property for the payload type name.
|
267 | //---------------------------------------------------------------------------
|
268 | get payloadTypeName() {
|
269 | return this._payloadTypeName;
|
270 | }
|
271 |
|
272 | //---------------------------------------------------------------------------
|
273 | // payloadTypeClass
|
274 | //
|
275 | // Property for the payload type class.
|
276 | //---------------------------------------------------------------------------
|
277 | get payloadTypeClass() {
|
278 | return this._payloadTypeClass;
|
279 | }
|
280 |
|
281 | }
|
282 |
|
283 | nodbUtil.wrapFns(AqQueue.prototype,
|
284 | "deqOne",
|
285 | "deqMany",
|
286 | "enqOne",
|
287 | "enqMany");
|
288 |
|
289 | module.exports = AqQueue;
|