Как мы укротили NATS в Go: Представляем deez-nats — наш ответ на сложность микросервисов
От разработчиков для разработчиков: почему мы устали писать бойлерплейт и создали свой фреймворк.
Привет! На связи команда разработки.
В нашей архитектуре мы делаем большую ставку на NATS. Это фантастическая технология: быстрая, надежная, с поддержкой как простых Pub/Sub (Core), так и персистентных стримов (JetStream). Но по мере роста количества микросервисов мы столкнулись с проблемой, знакомой многим go-разработчикам.
Мы тратили 30% времени на бизнес-логику и 70% — на "обвязку" NATS. Бесконечные nc.Subscribe, ручной маршалинг JSON, обработка ошибок, управление таймаутами в RPC-вызовах... Код раздувался, а читаемость падала.
Нам нужен был инструмент, который бы давал удобство HTTP-фреймворков (вроде Gin или Echo), но для асинхронного месседжинга. Не найдя идеального решения, мы написали своё.
Встречайте deez-nats — библиотеку, которая превращает работу с NATS в удовольствие.
Философия deez-nats
Когда мы проектировали эту библиотеку, мы ставили во главу угла три принципа:
-
Developer Experience (DX): API должен быть интуитивным.
-
Type Safety: Мы любим Go за типизацию, поэтому библиотека активно использует дженерики (Generics).
-
Production Ready: Graceful shutdown, middleware и контексты — это база, а не опция.
Давайте посмотрим, как это работает на практике.
Установка и Базовая настройка
Всё начинается стандартно. Библиотека требует Go 1.21+, так как мы используем современные фичи языка.
go get github.com/leinodev/deez-nats
В точке входа (main.go) мы инициализируем два основных сервиса: один для RPC (синхронные вызовы request-reply), другой — для событий.
package main
import (
"context"
"log"
"github.com/nats-io/nats.go"
"github.com/leinodev/deez-nats/natsevents"
"github.com/leinodev/deez-nats/natsrpc"
)
func main() {
// Подключение к "голому" NATS
nc, _ := nats.Connect(nats.DefaultURL)
defer nc.Close()
// Инициализация наших врапперов
// Мы можем передать опции конфигурации прямо сюда
rpcSvc := natsrpc.New(nc, natsrpc.WithBaseRoute("my-service"))
eventsSvc := natsevents.New(nc)
// ... регистрация хендлеров ...
ctx := context.Background()
// Запускаем слушателей в отдельных горутинах
go rpcSvc.StartWithContext(ctx)
go eventsSvc.StartWithContext(ctx)
select {} // Блокируем main
}
RPC: Забываем о ручном Publish
В чистом NATS реализация паттерна Request-Reply требует ручного управления Reply топиком. Мы автоматизировали это через абстракцию RPCContext.
Вот как выглядит типичный обработчик в нашем продакшене:
type UserRequest struct {
ID string `json:"id"`
}
type UserResponse struct {
Name string `json:"name"`
Email string `json:"email"`
}
// Регистрируем типизированный хендлер
rpcSvc.AddRPCHandler("users.get", func(ctx natsrpc.RPCContext) error {
var req UserRequest
// Request() сам распарсит JSON и вернет ошибку, если пейлоад битый
if err := ctx.Request(&req); err != nil {
return err
}
// Эмуляция бизнес-логики
user, found := database.FindUser(req.ID)
if !found {
// Мы можем вернуть ошибку, и вызывающая сторона её получит
return fmt.Errorf("user not found")
}
// Отправляем ответ. Библиотека сама замаршалит структуру в JSON
return ctx.Ok(UserResponse{
Name: user.Name,
Email: user.Email,
})
})
А вот как мы вызываем этот метод из другого микросервиса:
var response UserResponse
err := rpcSvc.CallRPC(ctx, "users.get", UserRequest{ID: "101"}, &response)
Никаких таймаутов подписки, никаких interface{}. Чистый, типизированный вызов.
Middleware и Группировка: Организуем код правильно
Когда проект растет, сваливать все маршруты в кучу — плохая идея. Мы внедрили концепцию Групп и Middleware, вдохновленную HTTP-роутерами.
Это позволяет нам, например, добавить логирование или проверку прав доступа только для определенной группы команд.
// Создаем группу маршрутов для админки
adminGroup := rpcSvc.Group("admin")
// Добавляем Middleware, который будет работать для всех ручек в этой группе
adminGroup.Use(func(next natsrpc.RPCHandler) natsrpc.RPCHandler {
return func(ctx natsrpc.RPCContext) error {
log.Println("Accessing admin area...")
// Тут могла быть проверка токена
return next(ctx)
}
})
// Регистрируем хендлер: итоговый топик будет "my-service.admin.delete-user"
// (при условии, что base route был "my-service")
adminGroup.AddRPCHandler("delete-user", deleteUserHandler)
JetStream и Типизированные События
Работа с JetStream (JS) сложнее, чем с Core NATS, из-за необходимости управлять Consumer'ами и Ack-политиками. Мы постарались скрыть эту сложность, оставив гибкость.
Мы используем дженерики для строгой типизации событий. Больше никаких опечаток в названиях полей JSON.
type OrderCreated struct {
OrderID string `json:"order_id"`
Amount float64 `json:"amount"`
}
// Подписываемся на JetStream событие
natsevents.AddTypedJetStreamJsonEventHandler(
eventsSvc,
"orders.created",
func(ctx natsevents.EventContext[jetstream.Msg, any], event OrderCreated) error {
log.Printf("Обработка заказа: %s на сумму %.2f", event.OrderID, event.Amount)
// Обратите внимание: если функция возвращает nil,
// библиотека автоматически отправит Ack сообщению!
return nil
},
// Опционально: указываем Durable Consumer и Queue Group для масштабирования
natsevents.WithJetStreamHandlerQueue("order-processor-queue"),
)
Килер-фича: Библиотека сама управляет Ack. Если ваш хендлер вернул ошибку, будет отправлен Nak (отрицательное подтверждение), и сообщение будет доставлено повторно согласно политикам JetStream.
Graceful Shutdown: Выходим красиво
В мире Kubernetes поды умирают и рождаются постоянно. Нельзя просто "убить" процесс — нужно дать текущим обработчикам завершить работу.
deez-nats реализует правильный паттерн завершения. Метод Shutdown:
-
Перестает принимать новые сообщения.
-
Ждет завершения активных хендлеров.
-
Закрывает подписки и соединение.
// В main.go
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
<-sigChan
shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// Спокойно завершаем работу сервисов
if err := rpcSvc.Shutdown(shutdownCtx); err != nil {
log.Printf("Error shutting down RPC: %v", err)
}
Итог
Мы создали deez-nats не как учебный проект, а как инструмент для решения реальных проблем в продакшене. Он позволил нам сократить объем кода в микросервисах, уменьшить количество ошибок, связанных с типами, и стандартизировать подход к общению между сервисами.
Библиотека поддерживает кастомные маршалеры (хотите Protobuf вместо JSON? Пожалуйста!) и полностью совместима с нативным клиентом nats.go.
Попробуйте внедрить её в свой проект: GitHub: leinodev/deez-nats
Будем рады вашим звездам, ишью и пулл-реквестам. Давайте делать Go-экосистему лучше вместе! 🥜