Files
2026-05-27 09:55:11 +09:00

276 lines
9.2 KiB
Go

package main
import (
"context"
"crypto/sha256"
"encoding/hex"
"log/slog"
"net/http"
"sync"
"time"
maxbot "github.com/max-messenger/max-bot-api-client-go"
)
// Config — настройки bridge, читаемые из env.
type Config struct {
MaxToken string // токен MAX API (нужен для direct-send/upload)
TgBotURL string // ссылка на TG-бота для /help
MaxBotURL string // ссылка на MAX-бота для /help
WebhookURL string // базовый URL для webhook (если пусто — long polling)
WebhookPort string // порт для webhook сервера
TgAPIURL string // custom TG Bot API URL (если пусто — api.telegram.org)
AllowedUsers []int64 // whitelist TG user IDs (empty = allow all)
TgMaxFileSizeMB int // max file size TG->MAX in MB (0 = unlimited)
MaxMaxFileSizeMB int // max file size MAX->TG in MB (0 = unlimited)
// MaxAllowedExts — whitelist расширений для TG→MAX (nil = не проверять локально).
// Если задан, файлы с не-вхождением блокируются до отправки на CDN.
MaxAllowedExts map[string]struct{}
// MessageNewline — если true, текст идёт с новой строки после имени отправителя:
// "Имя:\nтекст" вместо "Имя: текст". Задаётся через env MESSAGE_FORMAT=newline.
MessageNewline bool
}
// chatBreaker хранит состояние circuit breaker для одного чата.
type chatBreaker struct {
fails int
blockedAt time.Time
}
const (
cbMaxFails = 3 // после N фейлов — блокируем
cbCooldown = 5 * time.Minute // на сколько блокируем
)
// Bridge — основная структура, объединяющая зависимости.
type Bridge struct {
cfg Config
repo Repository
tg TGSender
maxApi *maxbot.Api
httpClient *http.Client // для скачивания/загрузки файлов (большой таймаут)
apiClient *http.Client // для коротких API-запросов (малый таймаут)
whSecret string // random path segment for webhook URLs
cpWaitMu sync.Mutex
cpWait map[int64]int64 // MAX userId → TG channel ID (ожидание пересылки)
cpTgOwnerMu sync.Mutex
cpTgOwner map[int64]int64 // TG channel ID → TG user ID (кто переслал пост)
cbMu sync.Mutex
breakers map[int64]*chatBreaker // destination chatID → breaker
// Буферизация TG media groups (альбомы)
mgMu sync.Mutex
mgBuffers map[string]*mediaGroupBuffer // MediaGroupID → buffer
}
// NewBridge создаёт экземпляр Bridge.
func NewBridge(cfg Config, repo Repository, tg TGSender, maxApi *maxbot.Api) *Bridge {
// Derive webhook secret from tokens (stable across restarts)
h := sha256.Sum256([]byte(cfg.MaxToken + tg.BotToken()))
secret := hex.EncodeToString(h[:8])
return &Bridge{
cfg: cfg,
repo: repo,
tg: tg,
maxApi: maxApi,
httpClient: &http.Client{
Timeout: 5 * time.Minute, // для download/upload больших файлов
},
apiClient: &http.Client{
Timeout: 15 * time.Second, // для коротких API-запросов
},
whSecret: secret,
cpWait: make(map[int64]int64),
cpTgOwner: make(map[int64]int64),
breakers: make(map[int64]*chatBreaker),
mgBuffers: make(map[string]*mediaGroupBuffer),
}
}
// cbBlocked проверяет, заблокирован ли чат.
func (b *Bridge) cbBlocked(chatID int64) bool {
b.cbMu.Lock()
defer b.cbMu.Unlock()
cb, ok := b.breakers[chatID]
if !ok {
return false
}
if cb.fails >= cbMaxFails && time.Since(cb.blockedAt) < cbCooldown {
return true
}
if cb.fails >= cbMaxFails {
// Кулдаун прошёл — сбрасываем, пробуем снова
delete(b.breakers, chatID)
}
return false
}
// cbFail регистрирует ошибку. Возвращает true если чат только что заблокировался.
func (b *Bridge) cbFail(chatID int64) bool {
b.cbMu.Lock()
defer b.cbMu.Unlock()
cb, ok := b.breakers[chatID]
if !ok {
cb = &chatBreaker{}
b.breakers[chatID] = cb
}
cb.fails++
if cb.fails == cbMaxFails {
cb.blockedAt = time.Now()
slog.Warn("circuit breaker: chat blocked", "chatID", chatID, "cooldown", cbCooldown)
return true
}
return false
}
// cbSuccess сбрасывает счётчик ошибок для чата.
func (b *Bridge) cbSuccess(chatID int64) {
b.cbMu.Lock()
defer b.cbMu.Unlock()
delete(b.breakers, chatID)
}
// maxMaxFileBytes returns the MAX-to-TG file size limit in bytes (0 = unlimited).
func (c *Config) maxMaxFileBytes() int64 {
if c.MaxMaxFileSizeMB <= 0 {
return 0
}
return int64(c.MaxMaxFileSizeMB) * 1024 * 1024
}
// isUserAllowed проверяет, есть ли tgUserID в белом списке.
// Если AllowedUsers пуст — доступ разрешён всем.
func (b *Bridge) isUserAllowed(tgUserID int64) bool {
if len(b.cfg.AllowedUsers) == 0 {
return true
}
for _, id := range b.cfg.AllowedUsers {
if id == tgUserID {
return true
}
}
return false
}
// checkUserAllowed проверяет доступ пользователя и отправляет сообщение об отказе если нужно.
// Возвращает true если доступ разрешён, false — если запрещён (и уже отправил ответ).
// userID == 0 трактуется как «нет отправителя» — доступ запрещается.
func (b *Bridge) checkUserAllowed(ctx context.Context, chatID, userID int64, threadID int) bool {
if userID != 0 && b.isUserAllowed(userID) {
return true
}
slog.Debug("TG user not allowed", "uid", userID)
b.tg.SendMessage(ctx, chatID, "У вас нет прав доступа к боту.", &SendOpts{ThreadID: threadID})
return false
}
// isCrosspostOwner проверяет, является ли userID владельцем связки.
// owner_id=0 и tg_owner_id=0 — старая связка, доступна всем.
func (b *Bridge) isCrosspostOwner(maxChatID, userID int64) bool {
maxOwner, tgOwner := b.repo.GetCrosspostOwner(maxChatID)
if maxOwner == 0 && tgOwner == 0 {
return true // legacy, no owner
}
return userID == maxOwner || userID == tgOwner
}
// tgFileURL возвращает прямой URL файла из TG — через custom API если настроен.
func (b *Bridge) tgFileURL(ctx context.Context, fileID string) (string, error) {
filePath, err := b.tg.GetFile(ctx, fileID)
if err != nil {
return "", err
}
return b.tg.GetFileDirectURL(filePath), nil
}
// tgChatTitle возвращает title TG-чата/канала по ID. Пустая строка если не удалось.
func (b *Bridge) tgChatTitle(ctx context.Context, chatID int64) string {
title, err := b.tg.GetChat(ctx, chatID)
if err != nil {
return ""
}
return title
}
func (b *Bridge) tgWebhookPath() string {
return "/tg-webhook-" + b.whSecret
}
func (b *Bridge) maxWebhookPath() string {
return "/max-webhook-" + b.whSecret
}
// registerCommands регистрирует команды бота в Telegram.
func (b *Bridge) registerCommands(ctx context.Context) {
cmds := []BotCommand{
{Command: "bridge", Description: "Связать чат с MAX-чатом"},
{Command: "unbridge", Description: "Удалить связку чатов"},
{Command: "thread", Description: "Установить топик для сообщений из MAX"},
{Command: "crosspost", Description: "Список связок кросспостинга"},
{Command: "help", Description: "Инструкция"},
}
if err := b.tg.SetMyCommands(ctx, cmds, nil); err != nil {
slog.Error("TG setMyCommands (default) failed", "err", err)
}
if err := b.tg.SetMyCommands(ctx, cmds, &CommandScope{Type: "all_chat_administrators"}); err != nil {
slog.Error("TG setMyCommands (admins) failed", "err", err)
}
}
// Run запускает TG и MAX listener'ы + периодическую очистку.
func (b *Bridge) Run(ctx context.Context) {
b.registerCommands(ctx)
go func() {
t := time.NewTicker(10 * time.Minute)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
b.repo.CleanOldMessages()
}
}
}()
// Воркер очереди — проверяет каждые 10 секунд
go func() {
t := time.NewTicker(10 * time.Second)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
b.processQueue(ctx)
}
}
}()
if b.cfg.WebhookURL != "" {
go func() {
addr := ":" + b.cfg.WebhookPort
srv := &http.Server{
Addr: addr,
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
IdleTimeout: 60 * time.Second,
}
slog.Info("Webhook server starting", "addr", addr)
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
slog.Error("Webhook server failed", "err", err)
}
}()
}
var wg sync.WaitGroup
wg.Add(2)
go func() { defer wg.Done(); b.listenTelegram(ctx) }()
go func() { defer wg.Done(); b.listenMax(ctx) }()
wg.Wait()
}