UNPKG

7.35 kBJavaScriptView Raw
1import { isAsyncIterable } from 'iterall';
2import inspect from '../jsutils/inspect';
3import { addPath, pathToArray } from '../jsutils/Path';
4import { GraphQLError } from '../error/GraphQLError';
5import { locatedError } from '../error/locatedError';
6import { assertValidExecutionArguments, buildExecutionContext, buildResolveInfo, collectFields, execute, getFieldDef, resolveFieldValueOrError } from '../execution/execute';
7import { getOperationRootType } from '../utilities/getOperationRootType';
8import mapAsyncIterator from './mapAsyncIterator';
9export function subscribe(argsOrSchema, document, rootValue, contextValue, variableValues, operationName, fieldResolver, subscribeFieldResolver) {
10 /* eslint-enable no-redeclare */
11 // Extract arguments from object args if provided.
12 return arguments.length === 1 ? subscribeImpl(argsOrSchema) : subscribeImpl({
13 schema: argsOrSchema,
14 document: document,
15 rootValue: rootValue,
16 contextValue: contextValue,
17 variableValues: variableValues,
18 operationName: operationName,
19 fieldResolver: fieldResolver,
20 subscribeFieldResolver: subscribeFieldResolver
21 });
22}
23/**
24 * This function checks if the error is a GraphQLError. If it is, report it as
25 * an ExecutionResult, containing only errors and no data. Otherwise treat the
26 * error as a system-class error and re-throw it.
27 */
28
29function reportGraphQLError(error) {
30 if (error instanceof GraphQLError) {
31 return {
32 errors: [error]
33 };
34 }
35
36 throw error;
37}
38
39function subscribeImpl(args) {
40 var schema = args.schema,
41 document = args.document,
42 rootValue = args.rootValue,
43 contextValue = args.contextValue,
44 variableValues = args.variableValues,
45 operationName = args.operationName,
46 fieldResolver = args.fieldResolver,
47 subscribeFieldResolver = args.subscribeFieldResolver;
48 var sourcePromise = createSourceEventStream(schema, document, rootValue, contextValue, variableValues, operationName, subscribeFieldResolver); // For each payload yielded from a subscription, map it over the normal
49 // GraphQL `execute` function, with `payload` as the rootValue.
50 // This implements the "MapSourceToResponseEvent" algorithm described in
51 // the GraphQL specification. The `execute` function provides the
52 // "ExecuteSubscriptionEvent" algorithm, as it is nearly identical to the
53 // "ExecuteQuery" algorithm, for which `execute` is also used.
54
55 var mapSourceToResponse = function mapSourceToResponse(payload) {
56 return execute(schema, document, payload, contextValue, variableValues, operationName, fieldResolver);
57 }; // Resolve the Source Stream, then map every source value to a
58 // ExecutionResult value as described above.
59
60
61 return sourcePromise.then(function (resultOrStream) {
62 return (// Note: Flow can't refine isAsyncIterable, so explicit casts are used.
63 isAsyncIterable(resultOrStream) ? mapAsyncIterator(resultOrStream, mapSourceToResponse, reportGraphQLError) : resultOrStream
64 );
65 });
66}
67/**
68 * Implements the "CreateSourceEventStream" algorithm described in the
69 * GraphQL specification, resolving the subscription source event stream.
70 *
71 * Returns a Promise which resolves to either an AsyncIterable (if successful)
72 * or an ExecutionResult (error). The promise will be rejected if the schema or
73 * other arguments to this function are invalid, or if the resolved event stream
74 * is not an async iterable.
75 *
76 * If the client-provided arguments to this function do not result in a
77 * compliant subscription, a GraphQL Response (ExecutionResult) with
78 * descriptive errors and no data will be returned.
79 *
80 * If the the source stream could not be created due to faulty subscription
81 * resolver logic or underlying systems, the promise will resolve to a single
82 * ExecutionResult containing `errors` and no `data`.
83 *
84 * If the operation succeeded, the promise resolves to the AsyncIterable for the
85 * event stream returned by the resolver.
86 *
87 * A Source Event Stream represents a sequence of events, each of which triggers
88 * a GraphQL execution for that event.
89 *
90 * This may be useful when hosting the stateful subscription service in a
91 * different process or machine than the stateless GraphQL execution engine,
92 * or otherwise separating these two steps. For more on this, see the
93 * "Supporting Subscriptions at Scale" information in the GraphQL specification.
94 */
95
96
97export function createSourceEventStream(schema, document, rootValue, contextValue, variableValues, operationName, fieldResolver) {
98 // If arguments are missing or incorrectly typed, this is an internal
99 // developer mistake which should throw an early error.
100 assertValidExecutionArguments(schema, document, variableValues);
101
102 try {
103 // If a valid context cannot be created due to incorrect arguments,
104 // this will throw an error.
105 var exeContext = buildExecutionContext(schema, document, rootValue, contextValue, variableValues, operationName, fieldResolver); // Return early errors if execution context failed.
106
107 if (Array.isArray(exeContext)) {
108 return Promise.resolve({
109 errors: exeContext
110 });
111 }
112
113 var type = getOperationRootType(schema, exeContext.operation);
114 var fields = collectFields(exeContext, type, exeContext.operation.selectionSet, Object.create(null), Object.create(null));
115 var responseNames = Object.keys(fields);
116 var responseName = responseNames[0];
117 var fieldNodes = fields[responseName];
118 var fieldNode = fieldNodes[0];
119 var fieldName = fieldNode.name.value;
120 var fieldDef = getFieldDef(schema, type, fieldName);
121
122 if (!fieldDef) {
123 throw new GraphQLError("The subscription field \"".concat(fieldName, "\" is not defined."), fieldNodes);
124 } // Call the `subscribe()` resolver or the default resolver to produce an
125 // AsyncIterable yielding raw payloads.
126
127
128 var resolveFn = fieldDef.subscribe || exeContext.fieldResolver;
129 var path = addPath(undefined, responseName);
130 var info = buildResolveInfo(exeContext, fieldDef, fieldNodes, type, path); // resolveFieldValueOrError implements the "ResolveFieldEventStream"
131 // algorithm from GraphQL specification. It differs from
132 // "ResolveFieldValue" due to providing a different `resolveFn`.
133
134 var result = resolveFieldValueOrError(exeContext, fieldDef, fieldNodes, resolveFn, rootValue, info); // Coerce to Promise for easier error handling and consistent return type.
135
136 return Promise.resolve(result).then(function (eventStream) {
137 // If eventStream is an Error, rethrow a located error.
138 if (eventStream instanceof Error) {
139 return {
140 errors: [locatedError(eventStream, fieldNodes, pathToArray(path))]
141 };
142 } // Assert field returned an event stream, otherwise yield an error.
143
144
145 if (isAsyncIterable(eventStream)) {
146 // Note: isAsyncIterable above ensures this will be correct.
147 return eventStream;
148 }
149
150 throw new Error('Subscription field must return Async Iterable. Received: ' + inspect(eventStream));
151 });
152 } catch (error) {
153 // As with reportGraphQLError above, if the error is a GraphQLError, report
154 // it as an ExecutionResult; otherwise treat it as a system-class error and
155 // re-throw it.
156 return error instanceof GraphQLError ? Promise.resolve({
157 errors: [error]
158 }) : Promise.reject(error);
159 }
160}