177 lines
6.1 KiB
Go
177 lines
6.1 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log/slog"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
)
|
|
|
|
const (
|
|
queueMaxAttempts = 30 // максимум попыток
|
|
queueMaxAge = 24 * time.Hour // дропаем сообщения старше 24 часов
|
|
queueBatchSize = 10
|
|
)
|
|
|
|
// retryDelay возвращает задержку перед следующей попыткой (экспоненциально).
|
|
func retryDelay(attempt int) time.Duration {
|
|
switch {
|
|
case attempt < 3:
|
|
return 10 * time.Second
|
|
case attempt < 6:
|
|
return 30 * time.Second
|
|
case attempt < 10:
|
|
return 1 * time.Minute
|
|
default:
|
|
return 2 * time.Minute
|
|
}
|
|
}
|
|
|
|
// enqueueTg2Max ставит сообщение TG→MAX в очередь.
|
|
func (b *Bridge) enqueueTg2Max(tgChatID int64, tgMsgID int, maxChatID int64, text, attType, attToken, replyTo, format string) {
|
|
now := time.Now().Unix()
|
|
item := &QueueItem{
|
|
Direction: "tg2max",
|
|
SrcChatID: tgChatID,
|
|
DstChatID: maxChatID,
|
|
SrcMsgID: strconv.Itoa(tgMsgID),
|
|
Text: text,
|
|
AttType: attType,
|
|
AttToken: attToken,
|
|
ReplyTo: replyTo,
|
|
Format: format,
|
|
CreatedAt: now,
|
|
NextRetry: now + int64(retryDelay(0).Seconds()),
|
|
}
|
|
if err := b.repo.EnqueueSend(item); err != nil {
|
|
slog.Error("enqueue failed", "err", err)
|
|
} else {
|
|
slog.Info("enqueued for retry", "dir", "tg2max", "dst", maxChatID)
|
|
}
|
|
}
|
|
|
|
// enqueueMax2Tg ставит сообщение MAX→TG в очередь.
|
|
func (b *Bridge) enqueueMax2Tg(maxChatID, tgChatID int64, maxMid, text, attType, attURL, parseMode string) {
|
|
now := time.Now().Unix()
|
|
item := &QueueItem{
|
|
Direction: "max2tg",
|
|
SrcChatID: maxChatID,
|
|
DstChatID: tgChatID,
|
|
SrcMsgID: maxMid,
|
|
Text: text,
|
|
AttType: attType,
|
|
AttURL: attURL,
|
|
ParseMode: parseMode,
|
|
CreatedAt: now,
|
|
NextRetry: now + int64(retryDelay(0).Seconds()),
|
|
}
|
|
if err := b.repo.EnqueueSend(item); err != nil {
|
|
slog.Error("enqueue failed", "err", err)
|
|
} else {
|
|
slog.Info("enqueued for retry", "dir", "max2tg", "dst", tgChatID)
|
|
}
|
|
}
|
|
|
|
// processQueue обрабатывает очередь — вызывается периодически.
|
|
func (b *Bridge) processQueue(ctx context.Context) {
|
|
items, err := b.repo.PeekQueue(queueBatchSize)
|
|
if err != nil {
|
|
slog.Error("peek queue failed", "err", err)
|
|
return
|
|
}
|
|
|
|
now := time.Now()
|
|
for _, item := range items {
|
|
// Слишком старое или слишком много попыток — дропаем
|
|
age := now.Sub(time.Unix(item.CreatedAt, 0))
|
|
if item.Attempts >= queueMaxAttempts || age > queueMaxAge {
|
|
slog.Warn("queue item expired", "id", item.ID, "dir", item.Direction, "attempts", item.Attempts, "age", age)
|
|
b.repo.DeleteFromQueue(item.ID)
|
|
if item.Direction == "tg2max" {
|
|
b.tg.SendMessage(ctx, item.SrcChatID, fmt.Sprintf("Сообщение не доставлено в MAX после %d попыток.", item.Attempts), nil)
|
|
}
|
|
continue
|
|
}
|
|
|
|
switch item.Direction {
|
|
case "tg2max":
|
|
b.processQueueTg2Max(ctx, item, now)
|
|
case "max2tg":
|
|
b.processQueueMax2Tg(ctx, item, now)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (b *Bridge) processQueueTg2Max(ctx context.Context, item QueueItem, now time.Time) {
|
|
mid, err := b.sendMaxDirectFormatted(ctx, item.DstChatID, item.Text, item.AttType, item.AttToken, item.ReplyTo, item.Format)
|
|
if err != nil {
|
|
errStr := err.Error()
|
|
// Permanent errors — дропаем
|
|
if strings.Contains(errStr, "403") || strings.Contains(errStr, "404") || strings.Contains(errStr, "chat.denied") {
|
|
slog.Warn("queue item dropped (permanent error)", "id", item.ID, "err", errStr)
|
|
b.repo.DeleteFromQueue(item.ID)
|
|
return
|
|
}
|
|
slog.Warn("queue retry failed", "id", item.ID, "dir", "tg2max", "attempt", item.Attempts+1, "err", err)
|
|
b.repo.IncrementAttempt(item.ID, now.Add(retryDelay(item.Attempts+1)).Unix())
|
|
return
|
|
}
|
|
slog.Info("queue retry ok", "id", item.ID, "dir", "tg2max", "mid", mid)
|
|
tgMsgID, _ := strconv.Atoi(item.SrcMsgID)
|
|
if tgMsgID > 0 {
|
|
b.repo.SaveMsg(item.SrcChatID, tgMsgID, item.DstChatID, mid)
|
|
}
|
|
b.repo.DeleteFromQueue(item.ID)
|
|
}
|
|
|
|
func (b *Bridge) processQueueMax2Tg(ctx context.Context, item QueueItem, now time.Time) {
|
|
var sentMsgID int
|
|
var err error
|
|
|
|
threadID := b.repo.GetTgThreadID(item.DstChatID)
|
|
|
|
if item.AttType != "" && item.AttURL != "" {
|
|
opts := &SendOpts{Caption: item.Text, ParseMode: item.ParseMode, ThreadID: threadID}
|
|
switch item.AttType {
|
|
case "photo":
|
|
sentMsgID, err = b.tg.SendPhoto(ctx, item.DstChatID, FileArg{URL: item.AttURL}, opts)
|
|
case "video":
|
|
sentMsgID, err = b.tg.SendVideo(ctx, item.DstChatID, FileArg{URL: item.AttURL}, opts)
|
|
case "audio":
|
|
sentMsgID, err = b.tg.SendAudio(ctx, item.DstChatID, FileArg{URL: item.AttURL}, opts)
|
|
case "file":
|
|
sentMsgID, err = b.tg.SendDocument(ctx, item.DstChatID, FileArg{URL: item.AttURL}, opts)
|
|
default:
|
|
sentMsgID, err = b.tg.SendPhoto(ctx, item.DstChatID, FileArg{URL: item.AttURL}, opts)
|
|
}
|
|
} else {
|
|
sentMsgID, err = b.tg.SendMessage(ctx, item.DstChatID, item.Text, &SendOpts{ParseMode: item.ParseMode, ThreadID: threadID})
|
|
}
|
|
|
|
if err != nil {
|
|
errStr := err.Error()
|
|
// Топики выключены — сбрасываем и повторяем без thread_id
|
|
if threadID != 0 && (strings.Contains(errStr, "message thread not found") ||
|
|
strings.Contains(errStr, "TOPIC_NOT_FOUND") ||
|
|
strings.Contains(errStr, "topics are disabled")) {
|
|
slog.Info("queue: forum topics disabled, resetting thread_id", "tgChat", item.DstChatID)
|
|
b.repo.SetTgThreadID(item.DstChatID, 0)
|
|
b.repo.IncrementAttempt(item.ID, now.Unix()) // retry immediately
|
|
return
|
|
}
|
|
if strings.Contains(errStr, "TOPIC_CLOSED") || strings.Contains(errStr, "403") || strings.Contains(errStr, "chat not found") {
|
|
slog.Warn("queue item dropped (permanent error)", "id", item.ID, "dir", "max2tg", "err", errStr)
|
|
b.repo.DeleteFromQueue(item.ID)
|
|
return
|
|
}
|
|
slog.Warn("queue retry failed", "id", item.ID, "dir", "max2tg", "attempt", item.Attempts+1, "err", err)
|
|
b.repo.IncrementAttempt(item.ID, now.Add(retryDelay(item.Attempts+1)).Unix())
|
|
return
|
|
}
|
|
slog.Info("queue retry ok", "id", item.ID, "dir", "max2tg", "msgID", sentMsgID)
|
|
b.repo.SaveMsg(item.DstChatID, sentMsgID, item.SrcChatID, item.SrcMsgID)
|
|
b.repo.DeleteFromQueue(item.ID)
|
|
}
|