198 lines
6.7 KiB
TypeScript
198 lines
6.7 KiB
TypeScript
// worker.ts
|
|
import {Injectable, Logger} from "@nestjs/common";
|
|
import {SqsPostService} from "./sqs.post.service";
|
|
import {SUPPORT_SOCIAL_PROVIDERS, XPosterRouterService, XStrategy} from "../x-poster/x-poster.router.service";
|
|
import {rand} from "../helper";
|
|
import {FacebookApi} from "../x-poster/facebook.api";
|
|
import {NotifyService} from "../notify.service";
|
|
|
|
@Injectable()
|
|
export class SqsPosterWorker {
|
|
private readonly logger = new Logger(SqsPosterWorker.name);
|
|
|
|
constructor(
|
|
private readonly sqs: SqsPostService,
|
|
private readonly xRouterService: XPosterRouterService,
|
|
private readonly facebookApi: FacebookApi,
|
|
private readonly notifyService: NotifyService,
|
|
) {
|
|
}
|
|
|
|
async start() {
|
|
console.log(`🚀 Worker started for ${await this.sqs.getQueueName()}`);
|
|
await this.notifyService.sendMessageToTele(`🚀 Worker started for ${await this.sqs.getQueueName()}`)
|
|
|
|
//check cookie
|
|
this.xRouterService.verifyCookie();
|
|
|
|
let ReceiptHandle = '';
|
|
while (true) {
|
|
try {
|
|
console.log('worker get message ...');
|
|
const msg = await this.sqs.getMessage();
|
|
|
|
if (!msg) {
|
|
console.log('no message , sleeping...');
|
|
await this.sleep(10000); //sleep 10s
|
|
continue;
|
|
}
|
|
|
|
const {raw, body} = msg;
|
|
ReceiptHandle = raw.ReceiptHandle!;
|
|
|
|
// 👉 ack (xóa message)
|
|
await this.sqs.deleteMessage(ReceiptHandle);
|
|
//ReceiptHandle = '';
|
|
|
|
// 👉 xử lý job
|
|
await this.process(body);
|
|
|
|
} catch (err) {
|
|
console.error('❌ Worker error:', err);
|
|
if (ReceiptHandle) {
|
|
await this.sqs.deleteMessage(ReceiptHandle!);
|
|
}
|
|
await this.sleep(60000); // tránh spam CPU khi lỗi
|
|
} finally {
|
|
ReceiptHandle = '';
|
|
}
|
|
}
|
|
}
|
|
|
|
private async process(data: any) {
|
|
console.log('📩 Got job:', data);
|
|
const {type, content, xSubmitProvider, tweetUrl, publishTo, tweetId, telegramChatId} = data;
|
|
switch (type) {
|
|
case 'X_POSTER_TWEET': {
|
|
await this.doPostTweet(
|
|
content,
|
|
publishTo,
|
|
xSubmitProvider);
|
|
break;
|
|
}
|
|
case 'X_POSTER_REPLY': {
|
|
await this.doReplyTweet(
|
|
content,
|
|
tweetUrl,
|
|
tweetId,
|
|
xSubmitProvider
|
|
);
|
|
break;
|
|
}
|
|
case 'X_POSTER_QUOTE': {
|
|
await this.doQuoteTweet(
|
|
content,
|
|
tweetUrl,
|
|
tweetId,
|
|
xSubmitProvider
|
|
);
|
|
break;
|
|
}
|
|
}
|
|
|
|
// TODO: gọi puppeteer / API X
|
|
// ví dụ:
|
|
// await postToX(data.content);
|
|
|
|
// giả lập delay
|
|
await this.sleep(rand(7, 10) * 1000); //nghỉ 10s
|
|
}
|
|
|
|
private sleep(ms: number) {
|
|
return new Promise(res => setTimeout(res, ms));
|
|
}
|
|
|
|
private async doPostTweet(
|
|
text: string,
|
|
publishTo: Array<string> = ['fb'],
|
|
strategy: string = XStrategy.API_ONLY,
|
|
) {
|
|
try {
|
|
console.log(`==> doPostTweet`, publishTo);
|
|
let sendSuccess = false;
|
|
if (publishTo.includes(SUPPORT_SOCIAL_PROVIDERS.FB)) {
|
|
console.log(`==> doPostTweet publish to fb`);
|
|
await this.facebookApi.postToPage(text);
|
|
await this.notifyService.sendMessageToTele(`Post to FB success`);
|
|
sendSuccess = true;
|
|
}
|
|
if (publishTo.includes(SUPPORT_SOCIAL_PROVIDERS.X)) {
|
|
console.log(`==> doPostTweet publish to X`);
|
|
|
|
// @ts-ignore
|
|
const r = await this.xRouterService.postTweet({text, strategy});
|
|
if (r.success) {
|
|
await this.notifyService.sendMessageToTele(`Post to X success!`);
|
|
} else {
|
|
const mergerError = r.attempts.map(m => m.error).join(", ");
|
|
this.logger.error(mergerError);
|
|
await this.notifyService.sendMessageToTele(`==>.doPostTweet X got error ${mergerError}`)
|
|
}
|
|
sendSuccess = true;
|
|
}
|
|
|
|
|
|
return sendSuccess
|
|
} catch (e) {
|
|
this.logger.error(`doPostTweet error:${e.message}`);
|
|
await this.notifyService.sendMessageToTele(`==> doPostTweet error:${e.message}`)
|
|
this.logger.error(e);
|
|
}
|
|
}
|
|
|
|
private async doReplyTweet(
|
|
text: string,
|
|
tweetUrl: string,
|
|
tweetId: string,
|
|
strategy: string = XStrategy.BROWSER_COOKIE
|
|
) {
|
|
try {
|
|
console.log('doReplyTweet');
|
|
// @ts-ignore
|
|
const r = await this.xRouterService.postReply({text, tweetUrl, tweetId, strategy});
|
|
if (r.success) {
|
|
await this.notifyService.sendMessageToTele(`Worker->reply message success`)
|
|
} else {
|
|
const mergerError = r.attempts.map(m => m.error).join(", ");
|
|
this.logger.error(mergerError);
|
|
await this.notifyService.sendMessageToTele(`==> doReplyTweet error ${mergerError}`)
|
|
}
|
|
return r
|
|
} catch (e) {
|
|
this.logger.error(e);
|
|
console.log("Mã lỗi:", e.code); // Ví dụ: 'ECONNABORTED' (timeout), 'ERR_NETWORK' (mất mạng)
|
|
console.log("Thông báo:", e.message);
|
|
await this.notifyService.sendMessageToTele(`Worker==> doReplyTweet error:${e.code} - ${e.message}`)
|
|
|
|
|
|
}
|
|
}
|
|
|
|
private async doQuoteTweet(
|
|
text: string,
|
|
tweetUrl: string,
|
|
tweetId: string,
|
|
strategy: string = XStrategy.BROWSER_COOKIE
|
|
) {
|
|
try {
|
|
console.log('doQuoteTweet');
|
|
// @ts-ignore
|
|
const r = await this.xRouterService.postQuote({text, tweetUrl, tweetId, strategy});
|
|
if (r.success) {
|
|
await this.notifyService.sendMessageToTele(`✅ Quote message success`)
|
|
} else {
|
|
|
|
const mergerError = r.attempts.map(m => m.error).join(", ");
|
|
|
|
this.logger.error(mergerError);
|
|
await this.notifyService.sendMessageToTele(`==> doQuoteTweet error ${mergerError}`)
|
|
|
|
}
|
|
return r
|
|
} catch (e) {
|
|
this.logger.error(e);
|
|
await this.notifyService.sendMessageToTele(`==> doQuoteTweet catch error:${e.message}`)
|
|
|
|
}
|
|
}
|
|
} |