UNPKG

6.46 kBJavaScriptView Raw
1/* global awslambda */ import { Readable } from 'node:stream';
2import { pipeline } from 'node:stream/promises';
3import { setTimeout } from 'node:timers/promises';
4const defaultLambdaHandler = ()=>{};
5const defaultPlugin = {
6 timeoutEarlyInMillis: 5,
7 timeoutEarlyResponse: ()=>{
8 throw new Error('Timeout');
9 },
10 streamifyResponse: false // Deprecate need for this when AWS provides a flag for when it's looking for it
11};
12const middy = (lambdaHandler = defaultLambdaHandler, plugin = {})=>{
13 // Allow base handler to be set using .handler()
14 if (typeof lambdaHandler !== 'function') {
15 plugin = lambdaHandler;
16 lambdaHandler = defaultLambdaHandler;
17 }
18 plugin = {
19 ...defaultPlugin,
20 ...plugin
21 };
22 plugin.timeoutEarly = plugin.timeoutEarlyInMillis > 0;
23 plugin.beforePrefetch?.();
24 const beforeMiddlewares = [];
25 const afterMiddlewares = [];
26 const onErrorMiddlewares = [];
27 const middyHandler = (event = {}, context = {})=>{
28 plugin.requestStart?.();
29 const request = {
30 event,
31 context,
32 response: undefined,
33 error: undefined,
34 internal: plugin.internal ?? {}
35 };
36 return runRequest(request, [
37 ...beforeMiddlewares
38 ], lambdaHandler, [
39 ...afterMiddlewares
40 ], [
41 ...onErrorMiddlewares
42 ], plugin);
43 };
44 const middy = plugin.streamifyResponse ? awslambda.streamifyResponse(async (event, responseStream, context)=>{
45 const handlerResponse = await middyHandler(event, context);
46 let handlerBody = handlerResponse;
47 if (handlerResponse.statusCode) {
48 handlerBody = handlerResponse.body ?? '';
49 responseStream = awslambda.HttpResponseStream.from(responseStream, handlerResponse);
50 }
51 // Source @datastream/core (MIT)
52 let handlerStream;
53 if (handlerBody._readableState) {
54 handlerStream = handlerBody;
55 } else if (typeof handlerBody === 'string') {
56 function* iterator(input) {
57 const size = 16384 // 16 * 1024 // Node.js default
58 ;
59 let position = 0;
60 const length = input.length;
61 while(position < length){
62 yield input.substring(position, position + size);
63 position += size;
64 }
65 }
66 handlerStream = Readable.from(iterator(handlerBody));
67 }
68 if (!handlerStream) {
69 throw new Error('handler response not a ReadableStream');
70 }
71 await pipeline(handlerStream, responseStream);
72 }) : middyHandler;
73 middy.use = (middlewares)=>{
74 if (!Array.isArray(middlewares)) {
75 middlewares = [
76 middlewares
77 ];
78 }
79 for (const middleware of middlewares){
80 const { before, after, onError } = middleware;
81 if (!before && !after && !onError) {
82 throw new Error('Middleware must be an object containing at least one key among "before", "after", "onError"');
83 }
84 if (before) middy.before(before);
85 if (after) middy.after(after);
86 if (onError) middy.onError(onError);
87 }
88 return middy;
89 };
90 // Inline Middlewares
91 middy.before = (beforeMiddleware)=>{
92 beforeMiddlewares.push(beforeMiddleware);
93 return middy;
94 };
95 middy.after = (afterMiddleware)=>{
96 afterMiddlewares.unshift(afterMiddleware);
97 return middy;
98 };
99 middy.onError = (onErrorMiddleware)=>{
100 onErrorMiddlewares.unshift(onErrorMiddleware);
101 return middy;
102 };
103 middy.handler = (replaceLambdaHandler)=>{
104 lambdaHandler = replaceLambdaHandler;
105 return middy;
106 };
107 return middy;
108};
109const runRequest = async (request, beforeMiddlewares, lambdaHandler, afterMiddlewares, onErrorMiddlewares, plugin)=>{
110 let timeoutAbort;
111 const timeoutEarly = plugin.timeoutEarly && request.context.getRemainingTimeInMillis // disable when AWS context missing (tests, containers)
112 ;
113 try {
114 await runMiddlewares(request, beforeMiddlewares, plugin);
115 // Check if before stack hasn't exit early
116 if (typeof request.response === 'undefined') {
117 plugin.beforeHandler?.();
118 const handlerAbort = new AbortController();
119 if (timeoutEarly) timeoutAbort = new AbortController();
120 request.response = await Promise.race([
121 lambdaHandler(request.event, request.context, {
122 signal: handlerAbort.signal
123 }),
124 timeoutEarly ? setTimeout(request.context.getRemainingTimeInMillis() - plugin.timeoutEarlyInMillis, undefined, {
125 signal: timeoutAbort.signal
126 }).then(()=>{
127 handlerAbort.abort();
128 return plugin.timeoutEarlyResponse();
129 }) : Promise.race([])
130 ]);
131 timeoutAbort?.abort() // lambdaHandler may not be a promise
132 ;
133 plugin.afterHandler?.();
134 await runMiddlewares(request, afterMiddlewares, plugin);
135 }
136 } catch (e) {
137 timeoutAbort?.abort() // timeout should be aborted on errors
138 ;
139 // Reset response changes made by after stack before error thrown
140 request.response = undefined;
141 request.error = e;
142 try {
143 await runMiddlewares(request, onErrorMiddlewares, plugin);
144 } catch (e) {
145 // Save error that wasn't handled
146 e.originalError = request.error;
147 request.error = e;
148 throw request.error;
149 }
150 // Catch if onError stack hasn't handled the error
151 if (typeof request.response === 'undefined') throw request.error;
152 } finally{
153 await plugin.requestEnd?.(request);
154 }
155 return request.response;
156};
157const runMiddlewares = async (request, middlewares, plugin)=>{
158 for (const nextMiddleware of middlewares){
159 plugin.beforeMiddleware?.(nextMiddleware.name);
160 const res = await nextMiddleware(request);
161 plugin.afterMiddleware?.(nextMiddleware.name);
162 // short circuit chaining and respond early
163 if (typeof res !== 'undefined') {
164 request.response = res;
165 return;
166 }
167 }
168};
169export default middy;
170