UNPKG

10.1 kBJavaScriptView Raw
1var ot = require('./ot');
2var projections = require('./projections');
3var ShareDBError = require('./error');
4
5var ERROR_CODE = ShareDBError.CODES;
6
7function SubmitRequest(backend, agent, index, id, op, options) {
8 this.backend = backend;
9 this.agent = agent;
10 // If a projection, rewrite the call into a call against the collection
11 var projection = backend.projections[index];
12 this.index = index;
13 this.projection = projection;
14 this.collection = (projection) ? projection.target : index;
15 this.id = id;
16 this.op = op;
17 this.options = options;
18
19 this.start = Date.now();
20 this._addOpMeta();
21
22 // Set as this request is sent through middleware
23 this.action = null;
24 // For custom use in middleware
25 this.custom = {};
26
27 // Whether or not to store a milestone snapshot. If left as null, the milestone
28 // snapshots are saved according to the interval provided to the milestone db
29 // options. If overridden to a boolean value, then that value is used instead of
30 // the interval logic.
31 this.saveMilestoneSnapshot = null;
32 this.suppressPublish = backend.suppressPublish;
33 this.maxRetries = backend.maxSubmitRetries;
34 this.retries = 0;
35
36 // return values
37 this.snapshot = null;
38 this.ops = [];
39 this.channels = null;
40}
41module.exports = SubmitRequest;
42
43SubmitRequest.prototype.submit = function(callback) {
44 var request = this;
45 var backend = this.backend;
46 var collection = this.collection;
47 var id = this.id;
48 var op = this.op;
49 // Send a special projection so that getSnapshot knows to return all fields.
50 // With a null projection, it strips document metadata
51 var fields = {$submit: true};
52
53 backend.db.getSnapshot(collection, id, fields, null, function(err, snapshot) {
54 if (err) return callback(err);
55
56 request.snapshot = snapshot;
57 request._addSnapshotMeta();
58
59 if (op.v == null) {
60 if (op.create && snapshot.type && op.src) {
61 // If the document was already created by another op, we will return a
62 // 'Document already exists' error in response and fail to submit this
63 // op. However, this could also happen in the case that the op was
64 // already committed and the create op was simply resent. In that
65 // case, we should return a non-fatal 'Op already submitted' error. We
66 // must get the past ops and check their src and seq values to
67 // differentiate.
68 backend.db.getCommittedOpVersion(collection, id, snapshot, op, null, function(err, version) {
69 if (err) return callback(err);
70 if (version == null) {
71 callback(request.alreadyCreatedError());
72 } else {
73 op.v = version;
74 callback(request.alreadySubmittedError());
75 }
76 });
77 return;
78 }
79
80 // Submitting an op with a null version means that it should get the
81 // version from the latest snapshot. Generally this will mean the op
82 // won't be transformed, though transform could be called on it in the
83 // case of a retry from a simultaneous submit
84 op.v = snapshot.v;
85 }
86
87 if (op.v === snapshot.v) {
88 // The snapshot hasn't changed since the op's base version. Apply
89 // without transforming the op
90 return request.apply(callback);
91 }
92
93 if (op.v > snapshot.v) {
94 // The op version should be from a previous snapshot, so it should never
95 // never exceed the current snapshot's version
96 return callback(request.newerVersionError());
97 }
98
99 // Transform the op up to the current snapshot version, then apply
100 var from = op.v;
101 backend.db.getOpsToSnapshot(collection, id, from, snapshot, null, function(err, ops) {
102 if (err) return callback(err);
103
104 if (ops.length !== snapshot.v - from) {
105 return callback(request.missingOpsError());
106 }
107
108 err = request._transformOp(ops);
109 if (err) return callback(err);
110
111 if (op.v !== snapshot.v) {
112 // This shouldn't happen, but is just a final sanity check to make
113 // sure we have transformed the op to the current snapshot version
114 return callback(request.versionAfterTransformError());
115 }
116
117 request.apply(callback);
118 });
119 });
120};
121
122SubmitRequest.prototype.apply = function(callback) {
123 // If we're being projected, verify that the op is allowed
124 var projection = this.projection;
125 if (projection && !projections.isOpAllowed(this.snapshot.type, projection.fields, this.op)) {
126 return callback(this.projectionError());
127 }
128
129 // Always set the channels before each attempt to apply. If the channels are
130 // modified in a middleware and we retry, we want to reset to a new array
131 this.channels = this.backend.getChannels(this.collection, this.id);
132
133 var request = this;
134 this.backend.trigger('apply', this.agent, this, function(err) {
135 if (err) return callback(err);
136
137 // Apply the submitted op to the snapshot
138 err = ot.apply(request.snapshot, request.op);
139 if (err) return callback(err);
140
141 request.commit(callback);
142 });
143};
144
145SubmitRequest.prototype.commit = function(callback) {
146 var request = this;
147 var backend = this.backend;
148 backend.trigger('commit', this.agent, this, function(err) {
149 if (err) return callback(err);
150
151 // Try committing the operation and snapshot to the database atomically
152 backend.db.commit(
153 request.collection,
154 request.id,
155 request.op,
156 request.snapshot,
157 request.options,
158 function(err, succeeded) {
159 if (err) return callback(err);
160 if (!succeeded) {
161 // Between our fetch and our call to commit, another client committed an
162 // operation. We expect this to be relatively infrequent but normal.
163 return request.retry(callback);
164 }
165 if (!request.suppressPublish) {
166 var op = request.op;
167 op.c = request.collection;
168 op.d = request.id;
169 op.m = undefined;
170 // Needed for agent to detect if it can ignore sending the op back to
171 // the client that submitted it in subscriptions
172 if (request.collection !== request.index) op.i = request.index;
173 backend.pubsub.publish(request.channels, op);
174 }
175 if (request._shouldSaveMilestoneSnapshot(request.snapshot)) {
176 request.backend.milestoneDb.saveMilestoneSnapshot(request.collection, request.snapshot);
177 }
178 callback();
179 });
180 });
181};
182
183SubmitRequest.prototype.retry = function(callback) {
184 this.retries++;
185 if (this.maxRetries != null && this.retries > this.maxRetries) {
186 return callback(this.maxRetriesError());
187 }
188 this.backend.emit('timing', 'submit.retry', Date.now() - this.start, this);
189 this.submit(callback);
190};
191
192SubmitRequest.prototype._transformOp = function(ops) {
193 var type = this.snapshot.type;
194 for (var i = 0; i < ops.length; i++) {
195 var op = ops[i];
196
197 if (this.op.src && this.op.src === op.src && this.op.seq === op.seq) {
198 // The op has already been submitted. There are a variety of ways this
199 // can happen in normal operation, such as a client resending an
200 // unacknowledged operation at reconnect. It's important we don't apply
201 // the same op twice
202 return this.alreadySubmittedError();
203 }
204
205 if (this.op.v !== op.v) {
206 return this.versionDuringTransformError();
207 }
208
209 var err = ot.transform(type, this.op, op);
210 if (err) return err;
211 this.ops.push(op);
212 }
213};
214
215SubmitRequest.prototype._addOpMeta = function() {
216 this.op.m = {
217 ts: this.start
218 };
219 if (this.op.create) {
220 // Consistently store the full URI of the type, not just its short name
221 this.op.create.type = ot.normalizeType(this.op.create.type);
222 }
223};
224
225SubmitRequest.prototype._addSnapshotMeta = function() {
226 var meta = this.snapshot.m || (this.snapshot.m = {});
227 if (this.op.create) {
228 meta.ctime = this.start;
229 } else if (this.op.del) {
230 this.op.m.data = this.snapshot.data;
231 }
232 meta.mtime = this.start;
233};
234
235SubmitRequest.prototype._shouldSaveMilestoneSnapshot = function(snapshot) {
236 // If the flag is null, it's not been overridden by the consumer, so apply the interval
237 if (this.saveMilestoneSnapshot === null) {
238 return snapshot && snapshot.v % this.backend.milestoneDb.interval === 0;
239 }
240
241 return this.saveMilestoneSnapshot;
242};
243
244// Non-fatal client errors:
245SubmitRequest.prototype.alreadySubmittedError = function() {
246 return new ShareDBError(ERROR_CODE.ERR_OP_ALREADY_SUBMITTED, 'Op already submitted');
247};
248SubmitRequest.prototype.rejectedError = function() {
249 return new ShareDBError(ERROR_CODE.ERR_OP_SUBMIT_REJECTED, 'Op submit rejected');
250};
251// Fatal client errors:
252SubmitRequest.prototype.alreadyCreatedError = function() {
253 return new ShareDBError(ERROR_CODE.ERR_DOC_ALREADY_CREATED, 'Invalid op submitted. Document already created');
254};
255SubmitRequest.prototype.newerVersionError = function() {
256 return new ShareDBError(
257 ERROR_CODE.ERR_OP_VERSION_NEWER_THAN_CURRENT_SNAPSHOT,
258 'Invalid op submitted. Op version newer than current snapshot'
259 );
260};
261SubmitRequest.prototype.projectionError = function() {
262 return new ShareDBError(
263 ERROR_CODE.ERR_OP_NOT_ALLOWED_IN_PROJECTION,
264 'Invalid op submitted. Operation invalid in projected collection'
265 );
266};
267// Fatal internal errors:
268SubmitRequest.prototype.missingOpsError = function() {
269 return new ShareDBError(
270 ERROR_CODE.ERR_SUBMIT_TRANSFORM_OPS_NOT_FOUND,
271 'Op submit failed. DB missing ops needed to transform it up to the current snapshot version'
272 );
273};
274SubmitRequest.prototype.versionDuringTransformError = function() {
275 return new ShareDBError(
276 ERROR_CODE.ERR_OP_VERSION_MISMATCH_DURING_TRANSFORM,
277 'Op submit failed. Versions mismatched during op transform'
278 );
279};
280SubmitRequest.prototype.versionAfterTransformError = function() {
281 return new ShareDBError(
282 ERROR_CODE.ERR_OP_VERSION_MISMATCH_AFTER_TRANSFORM,
283 'Op submit failed. Op version mismatches snapshot after op transform'
284 );
285};
286SubmitRequest.prototype.maxRetriesError = function() {
287 return new ShareDBError(
288 ERROR_CODE.ERR_MAX_SUBMIT_RETRIES_EXCEEDED,
289 'Op submit failed. Exceeded max submit retries of ' + this.maxRetries
290 );
291};