Перейти до основного вмісту

Брокер повідомлень

інформація

Документ "Брокер" відповідає за опис слухачів amqp повідомлень. Він визначає назви повідомлень, структуру цих повідомлень, а також слухачів цих повідомлень. Брокер повідомлень відіграє ключову роль при побудові взаємодії між сервісами.

Архітектура

Робота з повідомленнями має два напрямки - прослуховування повідомлень та генерація повідомлень.

Прослуховування

При необхідності опису слухачів повідомлень в предметній області необхідно створити опис документа "Брокер" з переліком необхідних обробників.

Документ "Брокер" повинен бути зареєстрований в документі "Реєстр" предметної області, яка в свою чергу, має бути зареєстрована в відповідному сервісі, включеному до бізнес-схеми. Це гарантує, що при запуску ядра обчислень будуть завантажені всі слухачі повідомлень. При завантаженні бізнес-схеми всі слухачі запускаються, що надає можливість прослуховування повідомлень.

broker-arch.svg

інформація

На відміну від адаптерів протоколів, джерелом взаємодії є спілкування через http чи ws протоколи, брокер повідомлень взаємодії з сервером брокера повідомлень. X-Fiber використовує RabbitMQ, як сервер брокера повідомлень. Коли створюється повідомлення, воно надається серверу брокера повідомлень, який вже сам направляє на відповідні сервера, які прослуховують відповідний тип повідомлення.

Такий механізм взаємодії закладає архітектурно слідуючий життєвий цикл, за яким працює X-Fiber:

  1. Завантажити бізнес-схеми, а саме, завантаження опису повідомлень з їх обробниками.
  2. Створити зʼєднання з сервером брокера повідомлень.
  3. Підписатись на всі повідомлення, які описані в бізнес-схемі.

Генерація

небезпека

Опис генерації повідомлень знаходиться в активній розробці.

Склад

Брокер повідомлень поєднує дві фунції - опис деталей слухача повідомлень та створення обробника повідомлень.

Загальна структура брокера

Брокер повідомлень описується як key-value обʼєкт, де key назва повідомлення, а value опис деталей обробки цього повідомлення. Деталі повідомлення являють собою різновид опцій, в залежності від типу взаємодії, а також обробник цього повідомлення.

type CommunicationKind = 'queue' | 'exchange';
type AuthScope = "public" | "private";
type Version = "v1" | "v2" | "v3" | "v4" | "v5" | string;

export type BrokerHandler = (
...args // ... handler args
) => Promise<void>

type QueueOptions = {
// ... options structure
}

type ConsumeOptions = {
// ... options structure
}

type ExchangeOptions = {
// ... options structure
}

interface BaseTopic {
communication: CommunicationKind;
scope: AuthScope;
version: Version;
handler: BrokerHandler;
}

interface QueueTopic extends BaseTopic {
type: "queue";
queue?: QueueOptions;
consume?: ConsumeOptions;
}

interface ExchangeTopic extends BaseTopic {
type: "exchange";
exchange?: ExchangeOptions;
consume?: ConsumeOptions;
}

type Topic = QueueTopic | ExchangeTopic;

type BrokerStructure<T extends string = string> = {
[key in T]: Topic;
};

де:

  • CommunicationKind - можливий тип взаємодії. Підтримуються наразі слідуючі типи взаємодії: queue - взаємодія через черги, та exchange взаємодія через обмінники повідомлень.
  • Handler - тип обробника повідомлень.
  • QueueOptions - опції налаштування черги.
  • ConsumeOptions - опції налаштування слухача черги.
  • ExchangeOptions - опції налаштування обмінника.
  • Topic - структура деталей повідомлення.
  • BrokerStructure - структура брокера повідомлень.

Деталі повідомлення

Деталі повідомлення складаються з:

  1. Типу комунікації (за замовчуванням queue).
  2. Типу приватизації (за замовчуванням public).
  3. Версії повідомлення (за замовчуванням v1).
  4. Опції налаштування слухача черги (за замовчуванням {}). В
  5. Опції налаштування черги (за замовчуванням { durable: true }). Доступний коли тип комунікації "queue"
  6. Опції налаштування обмінника (за замовчуванням {}). Доступний коли тип комунікації "exchange"

Тип комунікації

небезпека

Опис комунікації через черги та через обмінники повідомлень знаходиться в активній розробці.

Тип приватизації

небезпека

Опис типів приватизації знаходиться в активній розробці.

Версія

інформація

Версія слухача повідомлень дозволяє відокремити минулі реалізації слухачів з якими працюють відповідні джерела взаємодії від нової, що спрощує підтримку API без потреби створювати слухачі аналогічного призначення з словами синонімами.

За замовчуванням кінцевому маршруту присвоюється версія v1. Окремо варто підкреслити, що версії потрібно вказувати v1 / v2 / v3 / v4 і т.д.

Опції налаштування слухача черги

небезпека

Опис опцій налаштування слухача черги знаходиться в активній розробці

Опції налаштування черги

небезпека

Опис опцій налаштування черги знаходиться в активній розробці

Опції налаштування обмінника

небезпека

Опис опцій налаштування обмінника знаходиться в активній розробці

Обробник повідомлення

Обробник повідомлення являє собою асинхронну функцію, яка складається з 3х аргументів та нічого не повертає

import { RabbitMQ, Agents, Context } from '@x-fiber/proton'

export type BrokerHandler = (
msg: RabbitMQ.Message,
agents: Agents,
context: Context
) => Promise<void>;

де:

  • BrokerHandler - структура обробника повідомлення.
  • msg - структура повідомлення, яка дорівнює опису структури з бібліотеки amqplib
  • Agents - перелік агентів функціональності.
  • Context - контекст виконання запиту.

Використання композиції

X-Fiber виділяє окремі документи з можливостями використання в цих спеціалізованих інструментів, наприклад в документі "Репозиторій" наявний провайдер з переліком методів для створення запитів до бази даних. Таке рішення прийняте навмисно, щоб ділити бізнес-логіки на окремі структурні складові, а при описі обробника запита використовувати підхід "Композиція"

handler-execution-process.svg

Аргумент повідомлення

Аргументами запиту виступає обʼєкт повідомлення, який слухач очікує при отриманні повідомлення. Деталі опису структури повідомлення описані в amqplib.

Аргумент агентів функціональності

Аргумент агентів функціональності являє собою обʼєкт з переліком агентів:

type IFunctionalityAgent = {
// agent functionality description
}

type ISchemaAgent = {
// agent functionality description
}

type IIntegrationAgent = {
// agent functionality description
}

export type Agents = {
fnAgent: IFunctionalityAgent;
schemaAgent: ISchemaAgent;
inAgent: IIntegrationAgent;
};

де:

  • fnAgent - агент з переліком просторів імен функціональності, які надають функціональні компоненти ядра обчислень.
  • schemaAgent - агент з переліком функціональності для доступу до інших складових бізнес-схеми.
  • inAgent - агент з переліком просторів імен функціональності, які надають інтеграційні рішення ядра обчислень.

Деталі структури кожного агента функціональності описується в розділі "Агенти"

Аргумент контексту виконання

інформація

Контекст виконання має базову частину, яка складається зі сховища загальної інформації, унікального ідентифікатора запиту, знімку бізнес-схеми тощо. Окрім базової частини, контекст виконання може містити інформацію про систему надсилання, в випадку типу приватизації "private". Приватизація повідомлень необхідна при ідентифікації системи та контролю прав доступу. При обробці запиту в рамках життєвого циклу виконання, зʼєднувач RabbitMQConnector здійснить валідацію токена доступу, в випадку коли тип приватизації - "private", на основі нього здійснить пошук сесії в Redis сховищі та в разі успіху

  • наповнить контекст виконання даними сесії.

Базовий контекст

небезпека

Опис базового контексту виконання знаходиться в активній розробці

Розширений контекст за рахунок приватизації слухача

небезпека

Опис розширенного контексту виконання за рахунок приватизації слухача знаходиться в активній розробці

Життєвий цикл виконання

небезпека

Опис життєвого циклу виконання знаходиться в активній розробці

Передача файлів між сервісами

примітка

X-Fiber пропонує передачу файлів між сервісами не напряму - надаючи в чергу структуру файлу, а робити це в обхід, оскільки такі запити є тяжкими, а велика їх кількість можуть переповнити обчислювальну памʼять сервера брокера повідомлень.

При потребі передачі файлу між сервісами використовуйте Redis сховище, завдяки простору імен fileStorage (та визначення типу конфігурації файлового сховища - 'redis', який визначається в конфігурації роботи веб-сервера та заповнюється за шляхом 'strategies.fileStorage.type').

file-communication-between-services.png

Завантаження цілком

Завантаження файлу цілком допускається лише коли кількість файлів за визначений час їх життя в Redis буде допустимим для тих обʼємів, які визначаються бізнес-вимогами, або коли самі файли є малими при малому навантажені на сервіс, в іншому випадку необхідно використовувати формат "Завантаження частинами"

При завантаження цілком використовуйте методи без приставки stream:

  1. Здійснити завантаження в Redis.
  2. Передати повідомлення через брокер повідомлень в інший сервіс.
  3. Відвантажити файл в іншому сервісі завдяки його ідентифікатору в Redis.

Завантаження частинами

В випадках, коли сервіс високонавантажений або самі файли мають великий розмір - необхідно відвантажувати та завантажувати файли в Redis частинами (chunks). Такий підхід дозволяє постійно займати в сховищі мегабайти або десятки мегабайт замість повнорозмірного формату. Подібний режим надається Redis з типом stream.

При завантаження цілком використовуйте методи з приставкою stream:

  1. Почати здійснювати завантаження в Redis.
  2. Передати повідомлення через брокер повідомлень в інший сервіс про початок здійснення завантаження.
  3. Відвантажити частини в іншому сервісі завдяки його ідентифікатору в Redis здійснюючи з ними необхідну бізнес-логіку - передати стрім в хмарне сховище, сформувати файл в буфері тощо.

Реалізація брокера повідомлень

небезпека

Опис реалізації брокера повідомлень знаходиться в активній розробці

Реалізація генерації

небезпека

Опис реалізації генерації повідомлень знаходиться в активній розробці

Реєстрація

Щоб успішно використовувати слухачі повідомлень, документ "Брокер" повинен бути зареєстрований в документі "Реєстр" цієї предметної області. Наприклад для предметної області - агрегат "Користувачі", необхідно в опис документів функції setRegistry для встановлення посилання на брокер та інші компоненти модуля:

import { setRegistry } from '@x-fiber/proton';
import { BusUsersAggBroker } from './BusUsers.agg.emitter';

export const BusUsersAggRegistry = setRegistry<'BusUsersAgg'>('BusUsersAgg', {
broker: BusUsersAggBroker,
// ... other documents
});

Власні рішення

warning

X-Fiber виконує обробники запитів в рамках їх життєвого циклу, це означає, що інструменти, які не надає X-Fiber, але необхідні для реалізації бізнес-логіки є можливість поділити на два типи:

  • інструменти без створення стану - використання таких інструментів, наприклад date-fns, як бібліотека для роботи з датами, можуть бути додані в будь-якому місці попередньо встановивши їх в проєкт. X-Fiber рекомендує опис розгортання таких інструментів виносити в окрему директорію від опису бізнес-логіки.
  • інструменти зі створення стану - використання таких практик наразі X-Fiber не покривається, але рішення яке буде реалізовано в найближчих релізах це окрема дві функції - запуску та старту, які будуть запускатись після запуску та зупинки життєвого циклу роботи ядра обчислень. Ця функція може приймати ваші рішення по запуску та зупинки використання стану інструментів.

У разі впровадження власних рішень, рекомендується керуватися кращими практиками з метою поліпшення підтримки кодової бази. Якщо ви вважаєте, що інструмент, який ви використовуєте, має достатню популярність, ви можете запропонувати його включення в саму платформу X-Fiber. Ми готові обговорити цю можливість з вами, а при позитивному рішенні ми включимо цей інструмент до архітектури платформи.