КейсыБлог
Nikita Leino 4 дек. 2025 г. GoNatsLibrary

Как мы укротили NATS в Go: Представляем deez-nats — наш ответ на сложность микросервисов

От разработчиков для разработчиков: почему мы устали писать бойлерплейт и создали свой фреймворк.

Привет! На связи команда разработки.

В нашей архитектуре мы делаем большую ставку на NATS. Это фантастическая технология: быстрая, надежная, с поддержкой как простых Pub/Sub (Core), так и персистентных стримов (JetStream). Но по мере роста количества микросервисов мы столкнулись с проблемой, знакомой многим go-разработчикам.

Мы тратили 30% времени на бизнес-логику и 70% — на "обвязку" NATS. Бесконечные nc.Subscribe, ручной маршалинг JSON, обработка ошибок, управление таймаутами в RPC-вызовах... Код раздувался, а читаемость падала.

Нам нужен был инструмент, который бы давал удобство HTTP-фреймворков (вроде Gin или Echo), но для асинхронного месседжинга. Не найдя идеального решения, мы написали своё.

Встречайте deez-nats — библиотеку, которая превращает работу с NATS в удовольствие.

Философия deez-nats

Когда мы проектировали эту библиотеку, мы ставили во главу угла три принципа:

  1. Developer Experience (DX): API должен быть интуитивным.

  2. Type Safety: Мы любим Go за типизацию, поэтому библиотека активно использует дженерики (Generics).

  3. Production Ready: Graceful shutdown, middleware и контексты — это база, а не опция.

Давайте посмотрим, как это работает на практике.

Установка и Базовая настройка

Всё начинается стандартно. Библиотека требует Go 1.21+, так как мы используем современные фичи языка.

go get github.com/leinodev/deez-nats

В точке входа (main.go) мы инициализируем два основных сервиса: один для RPC (синхронные вызовы request-reply), другой — для событий.

package main

import (
    "context"
    "log"
    "github.com/nats-io/nats.go"
    "github.com/leinodev/deez-nats/natsevents"
    "github.com/leinodev/deez-nats/natsrpc"
)

func main() {
    // Подключение к "голому" NATS
    nc, _ := nats.Connect(nats.DefaultURL)
    defer nc.Close()

    // Инициализация наших врапперов
    // Мы можем передать опции конфигурации прямо сюда
    rpcSvc := natsrpc.New(nc, natsrpc.WithBaseRoute("my-service")) 
    eventsSvc := natsevents.New(nc)

    // ... регистрация хендлеров ...

    ctx := context.Background()
    // Запускаем слушателей в отдельных горутинах
    go rpcSvc.StartWithContext(ctx)
    go eventsSvc.StartWithContext(ctx)

    select {} // Блокируем main
}

RPC: Забываем о ручном Publish

В чистом NATS реализация паттерна Request-Reply требует ручного управления Reply топиком. Мы автоматизировали это через абстракцию RPCContext.

Вот как выглядит типичный обработчик в нашем продакшене:

type UserRequest struct {
    ID string `json:"id"`
}

type UserResponse struct {
    Name  string `json:"name"`
    Email string `json:"email"`
}

// Регистрируем типизированный хендлер
rpcSvc.AddRPCHandler("users.get", func(ctx natsrpc.RPCContext) error {
    var req UserRequest
    
    // Request() сам распарсит JSON и вернет ошибку, если пейлоад битый
    if err := ctx.Request(&req); err != nil {
        return err
    }

    // Эмуляция бизнес-логики
    user, found := database.FindUser(req.ID)
    if !found {
        // Мы можем вернуть ошибку, и вызывающая сторона её получит
        return fmt.Errorf("user not found")
    }

    // Отправляем ответ. Библиотека сама замаршалит структуру в JSON
    return ctx.Ok(UserResponse{
        Name:  user.Name,
        Email: user.Email,
    })
})

А вот как мы вызываем этот метод из другого микросервиса:

var response UserResponse
err := rpcSvc.CallRPC(ctx, "users.get", UserRequest{ID: "101"}, &response)

Никаких таймаутов подписки, никаких interface{}. Чистый, типизированный вызов.

Middleware и Группировка: Организуем код правильно

Когда проект растет, сваливать все маршруты в кучу — плохая идея. Мы внедрили концепцию Групп и Middleware, вдохновленную HTTP-роутерами.

Это позволяет нам, например, добавить логирование или проверку прав доступа только для определенной группы команд.

// Создаем группу маршрутов для админки
adminGroup := rpcSvc.Group("admin")

// Добавляем Middleware, который будет работать для всех ручек в этой группе
adminGroup.Use(func(next natsrpc.RPCHandler) natsrpc.RPCHandler {
    return func(ctx natsrpc.RPCContext) error {
        log.Println("Accessing admin area...")
        // Тут могла быть проверка токена
        return next(ctx)
    }
})

// Регистрируем хендлер: итоговый топик будет "my-service.admin.delete-user"
// (при условии, что base route был "my-service")
adminGroup.AddRPCHandler("delete-user", deleteUserHandler)

JetStream и Типизированные События

Работа с JetStream (JS) сложнее, чем с Core NATS, из-за необходимости управлять Consumer'ами и Ack-политиками. Мы постарались скрыть эту сложность, оставив гибкость.

Мы используем дженерики для строгой типизации событий. Больше никаких опечаток в названиях полей JSON.

type OrderCreated struct {
    OrderID string  `json:"order_id"`
    Amount  float64 `json:"amount"`
}

// Подписываемся на JetStream событие
natsevents.AddTypedJetStreamJsonEventHandler(
    eventsSvc,
    "orders.created",
    func(ctx natsevents.EventContext[jetstream.Msg, any], event OrderCreated) error {
        
        log.Printf("Обработка заказа: %s на сумму %.2f", event.OrderID, event.Amount)
        
        // Обратите внимание: если функция возвращает nil, 
        // библиотека автоматически отправит Ack сообщению!
        return nil
    },
    // Опционально: указываем Durable Consumer и Queue Group для масштабирования
    natsevents.WithJetStreamHandlerQueue("order-processor-queue"),
)

Килер-фича: Библиотека сама управляет Ack. Если ваш хендлер вернул ошибку, будет отправлен Nak (отрицательное подтверждение), и сообщение будет доставлено повторно согласно политикам JetStream.

Graceful Shutdown: Выходим красиво

В мире Kubernetes поды умирают и рождаются постоянно. Нельзя просто "убить" процесс — нужно дать текущим обработчикам завершить работу.

deez-nats реализует правильный паттерн завершения. Метод Shutdown:

  1. Перестает принимать новые сообщения.

  2. Ждет завершения активных хендлеров.

  3. Закрывает подписки и соединение.

// В main.go
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
<-sigChan

shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

// Спокойно завершаем работу сервисов
if err := rpcSvc.Shutdown(shutdownCtx); err != nil {
    log.Printf("Error shutting down RPC: %v", err)
}

Итог

Мы создали deez-nats не как учебный проект, а как инструмент для решения реальных проблем в продакшене. Он позволил нам сократить объем кода в микросервисах, уменьшить количество ошибок, связанных с типами, и стандартизировать подход к общению между сервисами.

Библиотека поддерживает кастомные маршалеры (хотите Protobuf вместо JSON? Пожалуйста!) и полностью совместима с нативным клиентом nats.go.

Попробуйте внедрить её в свой проект: GitHub: leinodev/deez-nats

Будем рады вашим звездам, ишью и пулл-реквестам. Давайте делать Go-экосистему лучше вместе! 🥜

Хотите использовать похожую технологию?

Наша команда разрабатывает веб-приложения, ботов, видеосервисы и интеграции с ИИ с нуля.