UNPKG

10.9 kBJavaScriptView Raw
1'use strict';
2var __importDefault = (this && this.__importDefault) || function (mod) {
3 return (mod && mod.__esModule) ? mod : { "default": mod };
4};
5Object.defineProperty(exports, "__esModule", { value: true });
6const os_1 = __importDefault(require("os"));
7const utils_1 = require("@terascope/utils");
8const cpuCount = os_1.default.cpus().length;
9const workers = cpuCount < 5 ? cpuCount : 5;
10function jobSchema(context) {
11 const schemas = {
12 analytics: {
13 default: true,
14 doc: 'logs the time it took in milliseconds for each action, '
15 + 'as well as the number of docs it receives',
16 format: Boolean,
17 },
18 assets: {
19 default: null,
20 doc: 'An array of actions to execute, typically the first is a reader '
21 + 'and the last is a sender with any number of processing function in-between',
22 format(arr) {
23 if (arr != null) {
24 if (!(Array.isArray(arr))) {
25 throw new Error('assets need to be of type array');
26 }
27 if (!arr.every(val => typeof val === 'string')) {
28 throw new Error('assets needs to be an array of strings');
29 }
30 }
31 },
32 },
33 lifecycle: {
34 default: 'once',
35 doc: 'Job lifecycle behavior, determines if it should exit on completion or remain active',
36 format: ['once', 'persistent'],
37 },
38 max_retries: {
39 default: 3,
40 doc: 'the number of times a worker will attempt to process '
41 + 'the same slice after a error has occurred',
42 format(val) {
43 if (isNaN(val)) {
44 throw new Error('max_retries parameter for job must be a number');
45 }
46 else if (val < 0) {
47 throw new Error('max_retries for job must be >= zero');
48 }
49 },
50 },
51 name: {
52 default: 'Custom Job',
53 doc: 'Name for specific job',
54 format(val) {
55 if (!val) {
56 throw new Error('name for job is required');
57 }
58 else if (typeof val !== 'string') {
59 throw new Error(' name for job must be a string');
60 }
61 },
62 },
63 operations: {
64 default: [],
65 doc: 'An array of actions to execute, typically the first is a reader ' +
66 'and the last is a sender with '
67 + 'any number of processing function in-between',
68 format(arr) {
69 if (!(Array.isArray(arr) && arr.length >= 2)) {
70 throw new Error('Operations need to be of type array with at least two operations in it');
71 }
72 const connectorsObject = context.sysconfig.terafoundation && context.sysconfig.terafoundation.connectors || {};
73 const connectors = Object.values(connectorsObject);
74 const connections = utils_1.flatten(connectors.map((conn) => Object.keys(conn)));
75 for (const op of arr) {
76 if (!op || !utils_1.isPlainObject(op)) {
77 throw new Error(`Invalid Operation config in operations, got ${utils_1.getTypeOf(op)}`);
78 }
79 if (op.connection && !connections.includes(op.connection)) {
80 throw new Error(`Operation ${op._op} refers to connection "${op.connection}" which is unavailable`);
81 }
82 }
83 },
84 },
85 apis: {
86 default: [],
87 doc: `An array of apis to load and any configurations they require.
88 Validated similar to operations, with the exception of no apis are required.
89 The _name property is required, and it is required to be unqiue
90 but can be suffixed with a identifier by using the format "example:0",
91 anything after the ":" is stripped out when searching for the file or folder.`,
92 format(arr) {
93 if (!Array.isArray(arr)) {
94 throw new Error('APIs is required to be an array');
95 }
96 const connectorsObject = context.sysconfig.terafoundation && context.sysconfig.terafoundation.connectors || {};
97 const connectors = Object.values(connectorsObject);
98 const connections = utils_1.flatten(connectors.map((conn) => Object.keys(conn)));
99 const names = [];
100 for (const api of arr) {
101 if (!api || !utils_1.isPlainObject(api)) {
102 throw new Error(`Invalid API config in apis, got ${utils_1.getTypeOf(api)}`);
103 }
104 if (!api._name) {
105 throw new Error('API requires an _name');
106 }
107 if (names.includes(api._name)) {
108 throw new Error(`Duplicate API configurations for ${api._name} found`);
109 }
110 names.push(api._name);
111 if (api.connection && !connections.includes(api.connection)) {
112 throw new Error(`API ${api._name} refers to connection "${api.connection}" which is unavailable`);
113 }
114 }
115 },
116 },
117 probation_window: {
118 default: 300000,
119 doc: 'time in ms that the execution controller checks for failed slices, '
120 + 'if there are none then it updates the state of the execution to running '
121 + '(this is only when lifecycle is set to persistent)',
122 format(val) {
123 if (isNaN(val)) {
124 throw new Error('probation_window parameter for job must be a number');
125 }
126 else if (val < 1) {
127 throw new Error('probation_window for job must be >= one');
128 }
129 },
130 },
131 recycle_worker: {
132 default: null,
133 doc: 'The number of slices a worker processes before it exits and restarts',
134 format(val) {
135 if (val != null) {
136 if (isNaN(val)) {
137 throw new Error('recycle_worker parameter for job must be a number');
138 }
139 else if (val < 0) {
140 throw new Error('recycle_worker for job must be >= zero');
141 }
142 }
143 },
144 },
145 slicers: {
146 default: 1,
147 doc: 'how many parallel slicer contexts that will run within the slicer',
148 format(val) {
149 if (isNaN(val)) {
150 throw new Error('slicers parameter for job must be a number');
151 }
152 else if (val < 1) {
153 throw new Error('slicers for job must be >= one');
154 }
155 },
156 },
157 workers: {
158 default: workers,
159 doc: 'the number of workers dedicated for the job',
160 format(val) {
161 if (isNaN(val)) {
162 throw new Error('workers parameter for job must be a number');
163 }
164 else if (val < 1) {
165 throw new Error('workers for job must be >= one');
166 }
167 },
168 },
169 };
170 const clusteringType = context.sysconfig.teraslice.cluster_manager_type;
171 if (clusteringType === 'kubernetes') {
172 schemas.targets = {
173 default: [],
174 doc: 'array of key/value labels used for targetting teraslice jobs to nodes',
175 format(arr) {
176 arr.forEach((label) => {
177 if (label['key'] == null) {
178 throw new Error(`targets need to have a key: ${label}`);
179 }
180 if (label['value'] == null) {
181 throw new Error(`targets need to have a value: ${label}`);
182 }
183 });
184 }
185 };
186 schemas.cpu = {
187 doc: 'number of cpus to reserve per teraslice worker in kubernetes',
188 default: undefined,
189 format: 'Number'
190 };
191 schemas.memory = {
192 doc: 'memory, in bytes, to reserve per teraslice worker in kubernetes',
193 default: undefined,
194 format: 'Number'
195 };
196 schemas.volumes = {
197 default: [],
198 doc: 'array of volumes to be mounted by job workers',
199 format(arr) {
200 arr.forEach((volume) => {
201 if (volume['name'] == null) {
202 throw new Error(`volumes need to have a name: ${volume}`);
203 }
204 if (volume['path'] == null) {
205 throw new Error(`volumes need to have a path: ${volume}`);
206 }
207 });
208 }
209 };
210 schemas.kubernetes_image = {
211 doc: 'Specify a custom image name for kubernetes, this only applies to kubernetes systems',
212 default: undefined,
213 format: 'optional_String'
214 };
215 }
216 return schemas;
217}
218exports.jobSchema = jobSchema;
219exports.makeJobSchema = jobSchema;
220exports.opSchema = {
221 _op: {
222 default: '',
223 doc: 'Name of operation, , it must reflect the name of the file or folder',
224 format: 'required_String',
225 },
226 _encoding: {
227 doc: 'Used for specifying the data encoding type when using `DataEntity.fromBuffer`. Defaults to `json`.',
228 default: 'json',
229 format: utils_1.dataEncodings,
230 },
231 _dead_letter_action: {
232 doc: `This action will specify what to do when failing to parse or transform a record. ​​​​​
233​​​​​The following builtin actions are supported: ​​​
234​​​​​ - "throw": throw the original error ​​​​​
235​​​​​ - "log": log the error and the data ​​​​​
236​​​​​ - "none": (default) skip the error entirely
237
238​​​​​If none of the actions are specified it will try and use a registered Dead Letter Queue API under that name.
239The API must be already be created by a operation before it can used.​`.trim(),
240 default: 'none',
241 format: 'optional_String',
242 }
243};
244exports.apiSchema = {
245 _name: {
246 default: '',
247 doc: `The _name property is required, and it is required to be unqiue
248 but can be suffixed with a identifier by using the format "example:0",
249 anything after the ":" is stripped out when searching for the file or folder.`,
250 format: 'required_String',
251 }
252};
253//# sourceMappingURL=job-schemas.js.map
\No newline at end of file