import { APIGatewayProxyEvent, APIGatewayProxyResult, SQSEvent, } from "aws-lambda"; import { SQSClient, SendMessageCommand } from "@aws-sdk/client-sqs"; import { handler as indexSubredditHandler } from "./sqs-index.js"; import { createMockSQSEvent, ProcessTask } from "@microfox/tool-core"; import { ragSubredditPaginator } from "../helpers/ragRedis.js"; const sqsClient = new SQSClient({}); const isOffline = process.env.IS_OFFLINE === "true"; export const handler = async ( event: APIGatewayProxyEvent ): Promise => { console.log("handler: sqs-trigger-subreddit", { isOffline, body: event.body, }); const taskHandler = new ProcessTask({ url: process.env.TASK_UPSTASH_REDIS_REST_URL, token: process.env.TASK_UPSTASH_REDIS_REST_TOKEN, }); if (!event.body) { console.log("handler: missing request body"); return { statusCode: 400, body: JSON.stringify({ message: "Missing request body" }), }; } try { console.log("handler: parsing request body"); const { subreddit } = JSON.parse(event.body); console.log("handler: parsed request body", { subreddit }); if (!subreddit) { console.log("handler: missing subreddit in request body"); return { statusCode: 400, body: JSON.stringify({ message: "Missing subreddit in request body" }), }; } if (!process.env.INDEX_SUBREDDIT_QUEUE_URL && !isOffline) { console.error("INDEX_SUBREDDIT_QUEUE_URL is not set"); return { statusCode: 500, body: JSON.stringify({ message: "Internal server error: Queue not configured", }), }; } // TODO: get the subreddit info // TODO: store/update it in db const task = await taskHandler.createTask({ subreddit, }); console.log("handler: preparing SQS message"); const authHeader = event.headers["x-auth-secrets"]; const sqsMessageBody = { subreddit, xAuthSecrets: authHeader, task_id: task.id}; console.log("handler: prepared SQS message", { sqsMessageBody }); console.log("handler: checking if subreddit is stale", { subreddit }); const isStale = await ragSubredditPaginator(subreddit).isStale(60 * 60); console.log("handler: checked if subreddit is stale", { isStale }); if (!isStale) { console.log("handler: subreddit already indexed in the last hour"); return { statusCode: 202, body: JSON.stringify({ message: "Subreddit already indexed in the last hour", }), }; } if (isOffline) { console.log( "handler: running in offline mode, invoking handler directly" ); const sqsEvent = createMockSQSEvent(sqsMessageBody); indexSubredditHandler(sqsEvent); console.log("handler: invoked handler directly"); } else { console.log("handler: sending message to SQS", { queueUrl: process.env.INDEX_SUBREDDIT_QUEUE_URL, }); const command = new SendMessageCommand({ QueueUrl: process.env.INDEX_SUBREDDIT_QUEUE_URL, MessageBody: JSON.stringify(sqsMessageBody), }); await sqsClient.send(command); console.log("handler: sent message to SQS"); } console.log("handler: finished successfully"); return { statusCode: 202, body: JSON.stringify({ message: "Subreddit indexing triggered successfully", }), }; } catch (error) { console.error(error); if (error instanceof SyntaxError) { console.log("handler: invalid JSON in request body"); return { statusCode: 400, body: JSON.stringify({ message: "Invalid JSON in request body" }), }; } console.log("handler: internal server error"); return { statusCode: 500, body: JSON.stringify({ message: "Internal server error" }), }; } }; export const handleApiRequestDocs = { summary: "Trigger subreddit indexing", description: "Triggers the indexing of a specific subreddit by sending a message to the SQS queue.", tags: ["Subreddit"], requestBody: { required: true, content: { "application/json": { schema: { type: "object", properties: { subreddit: { type: "string", description: "The name of the subreddit to index.", }, }, required: ["subreddit"], }, }, }, }, responses: { "202": { description: "Accepted: The indexing request has been successfully queued.", }, "400": { description: "Bad Request: Missing or invalid request body." }, "500": { description: "Internal server error." }, }, };