first commit
This commit is contained in:
@@ -0,0 +1,23 @@
|
||||
// sqs.module.ts
|
||||
import { Module, Global } from '@nestjs/common';
|
||||
import { SqsService } from './sqs.service';
|
||||
import {SqsPostService} from "./sqs.post.service";
|
||||
import {SqsPosterWorker} from "./sqs.poster.worker";
|
||||
import {XPosterRouterService} from "../x-poster/x-poster.router.service";
|
||||
import {XPosterModule} from "../x-poster/x-poster.module";
|
||||
import {FacebookApi} from "../x-poster/facebook.api";
|
||||
import {NotifyService} from "../notify.service";
|
||||
|
||||
@Global()
|
||||
@Module({
|
||||
imports:[XPosterModule],
|
||||
providers: [
|
||||
SqsService,
|
||||
SqsPostService,
|
||||
SqsPosterWorker,
|
||||
FacebookApi,
|
||||
NotifyService,
|
||||
],
|
||||
exports: [SqsService],
|
||||
})
|
||||
export class SqsModule {}
|
||||
@@ -0,0 +1,36 @@
|
||||
// post.service.ts
|
||||
import {Injectable, Logger, OnModuleInit} from '@nestjs/common';
|
||||
import {SqsService} from "./sqs.service";
|
||||
|
||||
|
||||
@Injectable()
|
||||
export class SqsPostService implements OnModuleInit {
|
||||
|
||||
private logger = new Logger(SqsPostService.name);
|
||||
private queueName: string = process.env.SQS_POSTER_QUEUE_NAME!;
|
||||
|
||||
constructor(private readonly sqs: SqsService) {
|
||||
if (this.queueName === undefined) {
|
||||
throw new Error('Chưa set queueName')
|
||||
}
|
||||
}
|
||||
|
||||
async onModuleInit() {
|
||||
await this.sqs.ensureQueue(this.queueName)
|
||||
// throw new Error("Method not implemented.");
|
||||
}
|
||||
|
||||
async getQueueName(): Promise<string> {
|
||||
return this.queueName;
|
||||
}
|
||||
|
||||
|
||||
async getMessage() {
|
||||
return this.sqs.receive(process.env.SQS_POSTER_QUEUE_NAME!);
|
||||
}
|
||||
|
||||
async deleteMessage(receiptHandle: string) {
|
||||
this.logger.debug(`delete message: ${JSON.stringify(receiptHandle)}`);
|
||||
return this.sqs.ack(this.queueName, receiptHandle);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,172 @@
|
||||
// worker.ts
|
||||
import {Injectable, Logger} from "@nestjs/common";
|
||||
import {SqsPostService} from "./sqs.post.service";
|
||||
import {SUPPORT_SOCIAL_PROVIDERS, XPosterRouterService, XStrategy} from "../x-poster/x-poster.router.service";
|
||||
import {rand} from "../helper";
|
||||
import {FacebookApi} from "../x-poster/facebook.api";
|
||||
import {NotifyService} from "../notify.service";
|
||||
|
||||
@Injectable()
|
||||
export class SqsPosterWorker {
|
||||
private readonly logger = new Logger(SqsPosterWorker.name);
|
||||
|
||||
constructor(
|
||||
private readonly sqs: SqsPostService,
|
||||
private readonly xRouterService: XPosterRouterService,
|
||||
private readonly facebookApi: FacebookApi,
|
||||
private readonly notifyService: NotifyService,
|
||||
) {
|
||||
}
|
||||
|
||||
async start() {
|
||||
console.log(`🚀 Worker started for ${await this.sqs.getQueueName()}`);
|
||||
await this.notifyService.sendMessageToTele(`🚀 Worker started for ${await this.sqs.getQueueName()}`)
|
||||
|
||||
let ReceiptHandle = '';
|
||||
while (true) {
|
||||
try {
|
||||
console.log('worker get message ...');
|
||||
const msg = await this.sqs.getMessage();
|
||||
|
||||
if (!msg) {
|
||||
console.log('no message , sleeping...');
|
||||
await this.sleep(10000); //sleep 10s
|
||||
continue;
|
||||
}
|
||||
|
||||
const {raw, body} = msg;
|
||||
ReceiptHandle = raw.ReceiptHandle!;
|
||||
|
||||
// 👉 ack (xóa message)
|
||||
await this.sqs.deleteMessage(ReceiptHandle);
|
||||
//ReceiptHandle = '';
|
||||
|
||||
// 👉 xử lý job
|
||||
await this.process(body);
|
||||
|
||||
} catch (err) {
|
||||
console.error('❌ Worker error:', err);
|
||||
if (ReceiptHandle) {
|
||||
await this.sqs.deleteMessage(ReceiptHandle!);
|
||||
}
|
||||
await this.sleep(60000); // tránh spam CPU khi lỗi
|
||||
} finally {
|
||||
ReceiptHandle = '';
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async process(data: any) {
|
||||
console.log('📩 Got job:', data);
|
||||
const {type, content, xSubmitProvider, tweetUrl, publishTo, tweetId, telegramChatId} = data;
|
||||
switch (type) {
|
||||
case 'X_POSTER_TWEET': {
|
||||
await this.doPostTweet(
|
||||
content,
|
||||
publishTo,
|
||||
xSubmitProvider);
|
||||
break;
|
||||
}
|
||||
case 'X_POSTER_REPLY': {
|
||||
await this.doReplyTweet(
|
||||
content,
|
||||
tweetUrl,
|
||||
tweetId,
|
||||
xSubmitProvider
|
||||
);
|
||||
break;
|
||||
}
|
||||
case 'X_POSTER_QUOTE': {
|
||||
await this.doQuoteTweet(
|
||||
content,
|
||||
tweetUrl,
|
||||
tweetId,
|
||||
xSubmitProvider
|
||||
);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: gọi puppeteer / API X
|
||||
// ví dụ:
|
||||
// await postToX(data.content);
|
||||
|
||||
// giả lập delay
|
||||
await this.sleep(rand(7, 10) * 1000); //nghỉ 10s
|
||||
}
|
||||
|
||||
private sleep(ms: number) {
|
||||
return new Promise(res => setTimeout(res, ms));
|
||||
}
|
||||
|
||||
private async doPostTweet(
|
||||
text: string,
|
||||
publishTo: Array<string> = ['fb'],
|
||||
strategy: string = XStrategy.API_ONLY,
|
||||
) {
|
||||
try {
|
||||
console.log(`==> doPostTweet`, publishTo);
|
||||
let sendSuccess = false;
|
||||
if (publishTo.includes(SUPPORT_SOCIAL_PROVIDERS.FB)) {
|
||||
console.log(`==> doPostTweet publish to fb`);
|
||||
await this.facebookApi.postToPage(text);
|
||||
await this.notifyService.sendMessageToTele(`Post to FB success`);
|
||||
sendSuccess = true;
|
||||
}
|
||||
if (publishTo.includes(SUPPORT_SOCIAL_PROVIDERS.X)) {
|
||||
console.log(`==> doPostTweet publish to X`);
|
||||
|
||||
// @ts-ignore
|
||||
await this.xRouterService.postTweet({text, strategy});
|
||||
// await this.notifyService.sendMessageToTele(`Post to X success!`);
|
||||
sendSuccess = true;
|
||||
}
|
||||
|
||||
|
||||
return sendSuccess
|
||||
} catch (e) {
|
||||
this.logger.error(`doPostTweet error:${e.message}`);
|
||||
await this.notifyService.sendMessageToTele(`==> doPostTweet error:${e.message}`)
|
||||
this.logger.error(e);
|
||||
}
|
||||
}
|
||||
|
||||
private async doReplyTweet(
|
||||
text: string,
|
||||
tweetUrl: string,
|
||||
tweetId: string,
|
||||
strategy: string = XStrategy.BROWSER_COOKIE
|
||||
) {
|
||||
try {
|
||||
console.log('doReplyTweet');
|
||||
// @ts-ignore
|
||||
const r = await this.xRouterService.postReply({text, tweetUrl, tweetId, strategy});
|
||||
await this.notifyService.sendMessageToTele(`Worker->reply message success`)
|
||||
return r
|
||||
} catch (e) {
|
||||
this.logger.error(e);
|
||||
await this.notifyService.sendMessageToTele(`Worker==> doReplyTweet error:${e.message}`)
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
private async doQuoteTweet(
|
||||
text: string,
|
||||
tweetUrl: string,
|
||||
tweetId: string,
|
||||
strategy: string = XStrategy.BROWSER_COOKIE
|
||||
) {
|
||||
try {
|
||||
console.log('doQuoteTweet');
|
||||
// @ts-ignore
|
||||
const r = await this.xRouterService.postQuote({text, tweetUrl, tweetId, strategy});
|
||||
await this.notifyService.sendMessageToTele(`✅ Quote message success`)
|
||||
return r
|
||||
} catch (e) {
|
||||
this.logger.error(e);
|
||||
await this.notifyService.sendMessageToTele(`==> doQuoteTweet error:${e.message}`)
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,141 @@
|
||||
// sqs.service.ts
|
||||
import {
|
||||
SQSClient,
|
||||
SendMessageCommand,
|
||||
ReceiveMessageCommand,
|
||||
DeleteMessageCommand,
|
||||
CreateQueueCommand,
|
||||
GetQueueUrlCommand,
|
||||
} from '@aws-sdk/client-sqs';
|
||||
import {Injectable, Logger, OnModuleInit} from '@nestjs/common';
|
||||
|
||||
|
||||
@Injectable()
|
||||
export class SqsService {
|
||||
private readonly logger = new Logger(SqsService.name);
|
||||
private client: SQSClient;
|
||||
|
||||
private baseUrl = process.env.SQS_ENDPOINT + '000000000000';
|
||||
|
||||
// // 👉 define tất cả queue ở đây
|
||||
// private queues = [
|
||||
// 'post-acc1',
|
||||
// 'post-acc2',
|
||||
// ];
|
||||
|
||||
|
||||
constructor() {
|
||||
this.client = new SQSClient({
|
||||
region: 'elasticmq',
|
||||
endpoint: process.env.SQS_ENDPOINT,
|
||||
credentials: {
|
||||
accessKeyId: 'x',
|
||||
secretAccessKey: 'x',
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
getQueueUrl(name: string) {
|
||||
return `${this.baseUrl}/${name}`;
|
||||
}
|
||||
|
||||
public async ensureQueue(name: string) {
|
||||
try {
|
||||
await this.client.send(new CreateQueueCommand({
|
||||
QueueName: name,
|
||||
}));
|
||||
|
||||
this.logger.log(`Queue ensured: ${name}`);
|
||||
} catch (err) {
|
||||
this.logger.error(`Failed to ensure queue ${name}`, err);
|
||||
}
|
||||
}
|
||||
|
||||
// private async ensureQueue(name: string) {
|
||||
// try {
|
||||
// await this.client.send(new GetQueueUrlCommand({
|
||||
// QueueName: name,
|
||||
// }));
|
||||
//
|
||||
// this.logger.log(`Queue exists: ${name}`);
|
||||
// } catch (err) {
|
||||
// this.logger.warn(`Queue missing → creating: ${name}`);
|
||||
//
|
||||
// await this.client.send(new CreateQueueCommand({
|
||||
// QueueName: name,
|
||||
// Attributes: {
|
||||
// VisibilityTimeout: '60',
|
||||
// MessageRetentionPeriod: '86400', // 1 ngày
|
||||
// },
|
||||
// }));
|
||||
//
|
||||
// this.logger.log(`Queue created: ${name}`);
|
||||
// }
|
||||
// }
|
||||
|
||||
// =====================
|
||||
// Core APIs
|
||||
// =====================
|
||||
|
||||
async enqueue(name: string, data: any, opts?: {
|
||||
delaySeconds?: number;
|
||||
jobId?: string;
|
||||
}) {
|
||||
const queueUrl = this.getQueueUrl(name);
|
||||
|
||||
// console.log(`QueueUrl: ${queueUrl}`);
|
||||
|
||||
const body = {
|
||||
...data,
|
||||
_jobId: opts?.jobId,
|
||||
_ts: Date.now(),
|
||||
};
|
||||
try {
|
||||
const data = await this.client.send(new SendMessageCommand({
|
||||
QueueUrl: queueUrl,
|
||||
MessageBody: JSON.stringify(body),
|
||||
DelaySeconds: opts?.delaySeconds || 0,
|
||||
}))
|
||||
console.log("Gửi thành công! MessageId:", data.MessageId);
|
||||
} catch (err) {
|
||||
console.error("Lỗi khi gửi tin nhắn:", err.message);
|
||||
// throw err;
|
||||
}
|
||||
// return this.client.send(new SendMessageCommand({
|
||||
// QueueUrl: queueUrl,
|
||||
// MessageBody: JSON.stringify(body),
|
||||
// DelaySeconds: opts?.delaySeconds || 0,
|
||||
// })).then(()=> {
|
||||
// this.logger.log(`Queue enqueued: ${name}`);
|
||||
// });
|
||||
}
|
||||
|
||||
async receive(queueName: string) {
|
||||
const queueUrl = this.getQueueUrl(queueName);
|
||||
|
||||
const res = await this.client.send(new ReceiveMessageCommand({
|
||||
QueueUrl: queueUrl,
|
||||
MaxNumberOfMessages: 1,
|
||||
WaitTimeSeconds: 10,
|
||||
VisibilityTimeout: 180,
|
||||
}));
|
||||
|
||||
if (!res.Messages?.length) return null;
|
||||
|
||||
const msg = res.Messages[0];
|
||||
|
||||
return {
|
||||
raw: msg,
|
||||
body: JSON.parse(msg.Body!),
|
||||
};
|
||||
}
|
||||
|
||||
async ack(name: string, receiptHandle: string) {
|
||||
const queueUrl = this.getQueueUrl(name);
|
||||
|
||||
await this.client.send(new DeleteMessageCommand({
|
||||
QueueUrl: queueUrl,
|
||||
ReceiptHandle: receiptHandle,
|
||||
}));
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user