26. Событийный конвейер
Предыдущая глава моделировала request/response SaaS-бэкенд, где доминирующий стиль — синхронные вызовы. В этой главе берётся система, устроенная иначе: событийный конвейер обработки данных, где почти каждое взаимодействие асинхронно, где модули не вызывают друг друга напрямую и где архитектурные вопросы другие.
Конкретно: аналитический конвейер. Сырые события приходят от продуктовых поверхностей, обогащаются атрибутами пользователя, фильтруются и агрегируются, попадают в хранилище и в дашборд реального времени. Важная форма: большинство модулей — это подписчики, читающие из источников, а не вызывающие, обращающиеся к провайдерам.
Что мы моделируем
ProductApp ──► RawEventsStoreFront ──► RawEvents │ └─► Enricher.Enrich (subscribes) │ └─► EnrichedEvents │ ├─► Aggregator.Roll (subscribes) │ │ │ └─► Metrics.Push │ └─► Warehouse.Land (subscribes) │ └─► Dashboard.Refresh (subscribes)Шесть модулей: два эмиттера (ProductApp, StoreFront), один преобразователь (Enricher), один агрегатор, одно хранилище, один дашборд. Два разных событийных интерфейса (RawEvents, EnrichedEvents). Каждая межмодульная связь асинхронна.
Раскладка пакета
analytics/├── package.archspace├── sources.arch # ProductApp, StoreFront (event emitters)├── pipeline.arch # Enricher, Aggregator (transformers)├── sinks.arch # Warehouse, Dashboard└── views.archМанифест:
name: acme.analyticsversion: "0.1.0"
use * from arch.modulesuse * from arch.kindsМодули
sources.arch:
frontend ProductApp { team: Product
labels { domain: Analytics security.zone: Internal }
"Customer-facing product. Emits clickstream and lifecycle events."
event RawEvents { "page_view, click, purchase, signup, churn" }}
frontend StoreFront { team: Frontend
labels { domain: Analytics security.zone: Internal }
"E-commerce frontend. Emits its own clickstream into the same pipeline."
event RawEvents}Два эмиттера, оба публикуют RawEvents. Форма событий у них разная, но вид интерфейса (event) и тема (RawEvents на каждом модуле) — это то, относительно чего разрешается разводка: подписчики видят каждое как отдельный <Module>.RawEvents.
pipeline.arch:
service Enricher { team: Data
labels { domain: Analytics security.zone: Internal }
"Joins raw events with user profile data. Emits enriched events downstream."
command Enrich { "Subscribed to every RawEvents source." subscribes: ProductApp.RawEvents }
event EnrichedEvents { "Raw event + resolved user_id, session_id, plan_tier." }}
service Aggregator { team: Data
labels { domain: Analytics security.zone: Internal }
"Rolls events into minute / hour / day buckets. Pushes to Metrics."
command Roll { "Subscribed to the enriched stream." subscribes: Enricher.EnrichedEvents }}
service Metrics { team: Data
labels { domain: Analytics security.zone: Internal }
"Tier-1 metrics surface — counters and rollups."
command Push { "Receive rolled-up metric increments from Aggregator." }}Обратите внимание: Enricher.Enrich подписывается на RawEvents одного источника. У конвейера два источника (ProductApp.RawEvents и StoreFront.RawEvents) — оба требуют разводки. Два варианта:
Вариант A: один обработчик-подписчик на каждый источник.
service Enricher { command EnrichProduct { subscribes: ProductApp.RawEvents } command EnrichStore { subscribes: StoreFront.RawEvents } event EnrichedEvents}Вариант B: одна каноническая тема событий, источники публикуют в общую. Требует локального для проекта вида интерфейса, который фиксирует “это событие участвует в аналитической теме”. Тоньше — разбирается в Главе 29, когда мы говорим про дизайн метамодели.
Для этой главы — вариант A: явные обработчики на каждый источник. Это понятнее, и валидатор всё ловит.
sinks.arch:
database Warehouse { team: Data
labels { domain: Analytics data.classification: pii }
"Long-term analytical store. Append-only, partitioned by event date."
command Land { "Append enriched events to the canonical table." subscribes: Enricher.EnrichedEvents }
event LandedEvents { "Emitted after a successful Land. Downstream consumers (Dashboard) subscribe here." }}
frontend Dashboard { team: Data
labels { domain: Analytics security.zone: Internal }
"Real-time analytics dashboard. Subscribes to warehouse landings for live refresh."
command Refresh { "Refresh active visualizations when new data lands." subscribes: Warehouse.LandedEvents }}Warehouse — вид database (с обязательной labels.data.classification из стандартной библиотеки). Land — входящая команда, подписанная на обогащённые события; LandedEvents — исходящее событие, на которое подписан Dashboard ниже по течению. Паттерн с двумя интерфейсами — один на приём, один на трансляцию — это правильная форма, когда узел и потребляет, и эмитит.
Ошибки, которых стоит избегать. Не вводите фиктивный модуль “EventBus” или “Broker” только для того, чтобы привязать подписки. Разводка уже живёт на обработчике. Добавление модуля-брокера раздувает диаграмму узлом, который не представляет архитектурный элемент — он представляет реализацию. Если в вашей реальной архитектуре есть управляемый брокер (Kafka, SNS), моделируйте его как
external_systemтолько если ваши сервисы явно вызывают его административный API. Плоскость данных через него невидима для модели; это правильно.
Процессы для редких синхронных потоков
Даже в событийном конвейере бывают синхронные моменты — обычно административные:
process #flow01 BackfillReplay { DataEngineer > Warehouse.Land : "manual replay of archived events"}
process #flow02 DashboardLoad { User > Dashboard.Refresh : "initial page load"}Два процесса на две синхронные точки соприкосновения. Сам поток данных здесь не зафиксирован — он фиксируется разводкой через subscribes:, которую рендерер автоматически соединяет в диаграмме.
Почему бы не написать огромный процесс, охватывающий весь конвейер? Процессы кодируют причинность через явный порядок шагов. Причинность конвейера уже закодирована в разводке через подписки:
Aggregator.Rollпроисходит потому что сработалEnricher.EnrichedEvents, а не потому что какой-то процесс сказал “Aggregator идёт после Enricher”. Запихивание этого в процесс дублирует информацию и без необходимости ограничивает диаграмму.
Проекции
view PipelineDataflow { focus domain: Analytics layout dagre "Full pipeline — sources, transformers, sinks. Async edges show subscribe wiring."}
view PIIScope { focus data.classification: pii group by team layout elk "Every module touching PII data. Used for data-classification audits."}PIIScope работает благодаря метке data.classification: pii на Warehouse. Каскад протаскивает её ко всему вложенному; focus-фильтр делает остальное.
Как выглядит событийная модель
После валидации:
- Шесть модулей, два событийных интерфейса, четыре асинхронных обработчика, разведённых через
subscribes:. - Диаграмма с направленными рёбрами, следующими за потоком данных, — источники сверху, приёмники снизу, пунктирные рёбра для асинхронных связей.
- Две проекции: dataflow для инженеров, область PII для governance.
Граф зависимостей неявно живёт в разводке через подписки. Если в SaaS-бэкенде (Глава 25) каждая зависимость фиксировалась шагом процесса, то у этого конвейера большинство зависимостей — в декларациях подписчиков. Оба варианта верны — выбирайте поверхность, которая соответствует тому, как система реально работает.
Решения, с которыми вы столкнётесь
Один обработчик на источник vs. общая тема. На каждый источник — это явнее и дружественнее к валидатору; общая тема — более идиоматично для реальных брокеров в стиле Kafka. Если у вас много источников, публикующих одинаковую форму события, определите локальный для проекта вид интерфейса (Глава 29), чтобы форма была задокументирована в одном месте.
Где живёт брокер? Как выше: обычно нигде в канонической модели. Брокеры — это реализация. Исключение: когда ваши сервисы вызывают административный API брокера (создают темы, программно перечисляют подписчиков), брокер становится external_system с этими административными командами в виде интерфейсов.
Моделирование разветвления. Не перечисляйте каждого подписчика на источнике — пусть каждый подписчик объявляет себя сам. Источник публикует; подписчики разводятся. Это сохраняет источник развязанным от того, как его потребляют.
Backpressure и повторные попытки. Не в модели. Это операционные заботы. Модель фиксирует кто что потребляет; насколько надёжно — это другой артефакт (документ SLO, runbook по реакции на инциденты).
Метки, которые перетекают. domain: Analytics стоит здесь на каждом модуле; вы могли бы каскадировать её из контейнера system Analytics { ... }, в котором держатся все шесть модулей. Вводить такой контейнер или нет — вопрос вкуса: на диаграмме он чисто читается как один кластер, но добавляет один уровень вложенности в исходнике.
Итог
- Событийные системы моделируют асинхронный поток через
subscribes:на обработчиках, а не через шаги процесса. - Обработчики на каждый источник (вариант A) понятнее; общие темы требуют локального для проекта вида интерфейса.
- Не моделируйте брокер — его плоскость данных невидима для архитектуры.
- Используйте процессы для оставшихся синхронных моментов (повторы, загрузки дашборда, административные операции).
- Проекции для потока данных (
layout dagre) и областей governance (PII, security-зоны) работают напрямую от каскадирующих меток.
Что дальше
Глава 27: Внешняя интеграция → — ваша система плюс сторонний API плюс их webhooks, точно моделируем границу.