Перейти к содержимому

26. Событийный конвейер

Предыдущая глава моделировала request/response SaaS-бэкенд, где доминирующий стиль — синхронные вызовы. В этой главе берётся система, устроенная иначе: событийный конвейер обработки данных, где почти каждое взаимодействие асинхронно, где модули не вызывают друг друга напрямую и где архитектурные вопросы другие.

Конкретно: аналитический конвейер. Сырые события приходят от продуктовых поверхностей, обогащаются атрибутами пользователя, фильтруются и агрегируются, попадают в хранилище и в дашборд реального времени. Важная форма: большинство модулей — это подписчики, читающие из источников, а не вызывающие, обращающиеся к провайдерам.

Что мы моделируем

ProductApp ──► RawEvents
StoreFront ──► RawEvents
└─► Enricher.Enrich (subscribes)
└─► EnrichedEvents
├─► Aggregator.Roll (subscribes)
│ │
│ └─► Metrics.Push
└─► Warehouse.Land (subscribes)
└─► Dashboard.Refresh (subscribes)

Шесть модулей: два эмиттера (ProductApp, StoreFront), один преобразователь (Enricher), один агрегатор, одно хранилище, один дашборд. Два разных событийных интерфейса (RawEvents, EnrichedEvents). Каждая межмодульная связь асинхронна.

Раскладка пакета

analytics/
├── package.archspace
├── sources.arch # ProductApp, StoreFront (event emitters)
├── pipeline.arch # Enricher, Aggregator (transformers)
├── sinks.arch # Warehouse, Dashboard
└── views.arch

Манифест:

name: acme.analytics
version: "0.1.0"
use * from arch.modules
use * from arch.kinds

Модули

sources.arch:

frontend ProductApp {
team: Product
labels {
domain: Analytics
security.zone: Internal
}
"Customer-facing product. Emits clickstream and lifecycle events."
event RawEvents {
"page_view, click, purchase, signup, churn"
}
}
frontend StoreFront {
team: Frontend
labels {
domain: Analytics
security.zone: Internal
}
"E-commerce frontend. Emits its own clickstream into the same pipeline."
event RawEvents
}

Два эмиттера, оба публикуют RawEvents. Форма событий у них разная, но вид интерфейса (event) и тема (RawEvents на каждом модуле) — это то, относительно чего разрешается разводка: подписчики видят каждое как отдельный <Module>.RawEvents.

pipeline.arch:

service Enricher {
team: Data
labels {
domain: Analytics
security.zone: Internal
}
"Joins raw events with user profile data. Emits enriched events downstream."
command Enrich {
"Subscribed to every RawEvents source."
subscribes: ProductApp.RawEvents
}
event EnrichedEvents {
"Raw event + resolved user_id, session_id, plan_tier."
}
}
service Aggregator {
team: Data
labels {
domain: Analytics
security.zone: Internal
}
"Rolls events into minute / hour / day buckets. Pushes to Metrics."
command Roll {
"Subscribed to the enriched stream."
subscribes: Enricher.EnrichedEvents
}
}
service Metrics {
team: Data
labels {
domain: Analytics
security.zone: Internal
}
"Tier-1 metrics surface — counters and rollups."
command Push { "Receive rolled-up metric increments from Aggregator." }
}

Обратите внимание: Enricher.Enrich подписывается на RawEvents одного источника. У конвейера два источника (ProductApp.RawEvents и StoreFront.RawEvents) — оба требуют разводки. Два варианта:

Вариант A: один обработчик-подписчик на каждый источник.

service Enricher {
command EnrichProduct { subscribes: ProductApp.RawEvents }
command EnrichStore { subscribes: StoreFront.RawEvents }
event EnrichedEvents
}

Вариант B: одна каноническая тема событий, источники публикуют в общую. Требует локального для проекта вида интерфейса, который фиксирует “это событие участвует в аналитической теме”. Тоньше — разбирается в Главе 29, когда мы говорим про дизайн метамодели.

Для этой главы — вариант A: явные обработчики на каждый источник. Это понятнее, и валидатор всё ловит.

sinks.arch:

database Warehouse {
team: Data
labels {
domain: Analytics
data.classification: pii
}
"Long-term analytical store. Append-only, partitioned by event date."
command Land {
"Append enriched events to the canonical table."
subscribes: Enricher.EnrichedEvents
}
event LandedEvents {
"Emitted after a successful Land. Downstream consumers (Dashboard) subscribe here."
}
}
frontend Dashboard {
team: Data
labels {
domain: Analytics
security.zone: Internal
}
"Real-time analytics dashboard. Subscribes to warehouse landings for live refresh."
command Refresh {
"Refresh active visualizations when new data lands."
subscribes: Warehouse.LandedEvents
}
}

Warehouse — вид database (с обязательной labels.data.classification из стандартной библиотеки). Land — входящая команда, подписанная на обогащённые события; LandedEvents — исходящее событие, на которое подписан Dashboard ниже по течению. Паттерн с двумя интерфейсами — один на приём, один на трансляцию — это правильная форма, когда узел и потребляет, и эмитит.

Ошибки, которых стоит избегать. Не вводите фиктивный модуль “EventBus” или “Broker” только для того, чтобы привязать подписки. Разводка уже живёт на обработчике. Добавление модуля-брокера раздувает диаграмму узлом, который не представляет архитектурный элемент — он представляет реализацию. Если в вашей реальной архитектуре есть управляемый брокер (Kafka, SNS), моделируйте его как external_system только если ваши сервисы явно вызывают его административный API. Плоскость данных через него невидима для модели; это правильно.

Процессы для редких синхронных потоков

Даже в событийном конвейере бывают синхронные моменты — обычно административные:

process #flow01 BackfillReplay {
DataEngineer > Warehouse.Land : "manual replay of archived events"
}
process #flow02 DashboardLoad {
User > Dashboard.Refresh : "initial page load"
}

Два процесса на две синхронные точки соприкосновения. Сам поток данных здесь не зафиксирован — он фиксируется разводкой через subscribes:, которую рендерер автоматически соединяет в диаграмме.

Почему бы не написать огромный процесс, охватывающий весь конвейер? Процессы кодируют причинность через явный порядок шагов. Причинность конвейера уже закодирована в разводке через подписки: Aggregator.Roll происходит потому что сработал Enricher.EnrichedEvents, а не потому что какой-то процесс сказал “Aggregator идёт после Enricher”. Запихивание этого в процесс дублирует информацию и без необходимости ограничивает диаграмму.

Проекции

view PipelineDataflow {
focus domain: Analytics
layout dagre
"Full pipeline — sources, transformers, sinks. Async edges show subscribe wiring."
}
view PIIScope {
focus data.classification: pii
group by team
layout elk
"Every module touching PII data. Used for data-classification audits."
}

PIIScope работает благодаря метке data.classification: pii на Warehouse. Каскад протаскивает её ко всему вложенному; focus-фильтр делает остальное.

Как выглядит событийная модель

После валидации:

  • Шесть модулей, два событийных интерфейса, четыре асинхронных обработчика, разведённых через subscribes:.
  • Диаграмма с направленными рёбрами, следующими за потоком данных, — источники сверху, приёмники снизу, пунктирные рёбра для асинхронных связей.
  • Две проекции: dataflow для инженеров, область PII для governance.

Граф зависимостей неявно живёт в разводке через подписки. Если в SaaS-бэкенде (Глава 25) каждая зависимость фиксировалась шагом процесса, то у этого конвейера большинство зависимостей — в декларациях подписчиков. Оба варианта верны — выбирайте поверхность, которая соответствует тому, как система реально работает.

Решения, с которыми вы столкнётесь

Один обработчик на источник vs. общая тема. На каждый источник — это явнее и дружественнее к валидатору; общая тема — более идиоматично для реальных брокеров в стиле Kafka. Если у вас много источников, публикующих одинаковую форму события, определите локальный для проекта вид интерфейса (Глава 29), чтобы форма была задокументирована в одном месте.

Где живёт брокер? Как выше: обычно нигде в канонической модели. Брокеры — это реализация. Исключение: когда ваши сервисы вызывают административный API брокера (создают темы, программно перечисляют подписчиков), брокер становится external_system с этими административными командами в виде интерфейсов.

Моделирование разветвления. Не перечисляйте каждого подписчика на источнике — пусть каждый подписчик объявляет себя сам. Источник публикует; подписчики разводятся. Это сохраняет источник развязанным от того, как его потребляют.

Backpressure и повторные попытки. Не в модели. Это операционные заботы. Модель фиксирует кто что потребляет; насколько надёжно — это другой артефакт (документ SLO, runbook по реакции на инциденты).

Метки, которые перетекают. domain: Analytics стоит здесь на каждом модуле; вы могли бы каскадировать её из контейнера system Analytics { ... }, в котором держатся все шесть модулей. Вводить такой контейнер или нет — вопрос вкуса: на диаграмме он чисто читается как один кластер, но добавляет один уровень вложенности в исходнике.

Итог

  • Событийные системы моделируют асинхронный поток через subscribes: на обработчиках, а не через шаги процесса.
  • Обработчики на каждый источник (вариант A) понятнее; общие темы требуют локального для проекта вида интерфейса.
  • Не моделируйте брокер — его плоскость данных невидима для архитектуры.
  • Используйте процессы для оставшихся синхронных моментов (повторы, загрузки дашборда, административные операции).
  • Проекции для потока данных (layout dagre) и областей governance (PII, security-зоны) работают напрямую от каскадирующих меток.

Что дальше

Глава 27: Внешняя интеграция → — ваша система плюс сторонний API плюс их webhooks, точно моделируем границу.