Что такое реконсиляция данных
Реконсиляция — проверка целостности данных в распределенных системах
При разработке и использовании распределенных систем перед нами возникает задача контроля целостности и идентичности данных между системами — задача реконсиляции.
Требования, которые выставляет заказчик — минимальное время данной операции, поскольку чем раньше расхождение будет найдено, тем легче будет устранить его последствия. Задача заметно усложняется тем, что системы находятся в постоянном движении (
100 000 транзакций в час) и добиться 0% расхождений не получится.
Основная идея
Основную идею решения можно описать следующей диаграммой.
Рассмотрим каждый из элементов отдельно.
Адаптеры данных
Каждая из систем создана для своей предметной области и как следствие описания объектов могут существенно различаться. Нам же требуется сравнить только определенный набор полей из этих объектов.
Для упрощения процедуры сравнения, приведем объекты к единому формату, написав для каждого источника данных свой адаптер. Приведение объектов к единому формату, позволяет заметно сократить объем используемой памяти, поскольку хранить мы будем только сравниваемые поля.
Под капотом у адаптера может быть любой источник данных: HttpClient, SqlClient, DynamoDbClient и т.д.
Ниже представлен интерфейс IAdapter, который требуется реализовать:
Хранилище
Реконсиляцию данных можно начинать только после того как все данные прочитаны, поскольку адаптеры могут возвращать их в произвольном порядке.
В таком случае объема оперативной памяти может не хватить, особенно если вы запускаете несколько реконсиляций одновременно, указывая большие временные интервалы.
Рассмотрим интерфейс IStorage
Хранилище. Реализация на базе MS SQL
Мы реализовали IStorage, используя MS SQL, что позволило выполнять сравнение полностью на стороне Db сервера.
Для хранения реконсилируемых значений достаточно создать следующую таблицу:
Каждая запись содержит системные поля ([id], [adapterId]) и поля, по которым осуществляется сравнение ([qty], [price]). Пару слов о системных полях:
[id] — уникальный идентификатор записи, одинаковый в обеих системах
[adapterId] — идентификатор адаптера, через который была получена запись
Так как процессы реконсиляции могут быть запущены параллельно и иметь пересекающиеся интервалы, мы создаем для каждой из них таблицу с уникальным именем. В случае если реконсиляция прошла успешно, данная таблица удаляется, в противном случае отправляется отчет со списком записей, в которых есть расхождения.
Хранилище. Сравнение значений
Представим, что у нас есть 2 множества, элементы которого имеют абсолютно идентичный набор полей. Рассмотрим 4 возможных случая их пересечения:
A. Элементы присутствуют только в левом множестве
B. Элементы присутствуют в обоих множествах, но имеют разные значения
С. Элементы присутствуют только в правом множестве
D. Элементы присутствуют в обоих множествах и имеют одинаковые значения
В конкретной задаче нам требуется найти элементы, описанные в случаях A, B, C. Получить требуемый результат можно за один запрос к MS SQL через FULL OUTER JOIN:
Вывод данного запроса может содержать 4 вида записей, отвечающих исходным требованиям
# | id | adapterid | comment |
---|---|---|---|
1 | guid1 | adp1 | Запись присутствует только в левом множестве. Случай A |
2 | guid2 | adp2 | Запись присутствует только в правом множестве. Случай С |
3 | guid3 | adp1 | Записи присутствует в обоих множествах, но имеют разные значения. Случай B |
4 | guid3 | adp2 | Записи присутствует в обоих множествах, но имеют разные значения. Случай B |
Хранилище. Хэширование
Используя хэширование по сравниваемым объектам, можно значительно удешевить операции записи и сравнения. Особенно, когда заходит речь о сравнении десятков полей.
Наиболее универсальным оказался способ хэширования сериализованного представления объекта.
1. Для хэширования мы используем стандартный метод GetHashCode(), возвращающий int32 и переопределенный для всех примитивных типов.
2. В данном случае вероятность коллизий маловероятна, поскольку сравниваются только записи имеющие одинаковые идентификаторы.
Рассмотрим структуру таблицы, используемую при данной оптимизации:
Преимущество такой структуры — это константная стоимость хранения одной записи (24 байта), которая не будет зависеть от числа сравниваемых полей.
Естественно и процедура сравнения претерпевает свои изменения и становится значительно проще.
Процессор
В данном разделе пойдет речь о классе, содержащем всю бизнес-логику реконсиляции, а именно:
1. параллельное чтение данных из адаптеров
2. хэширование данных
3. буферизованная запись значений в БД
4. выдача результатов
Более комплексное описание процесса реконсиляции можно получить, рассмотрев диаграмму последовательностей и интерфейс IProcessor.
Благодарности
Огромная благодарность моим коллегам из MySale Group за фидбек: AntonStrakhov, Nesstory, Barlog_5, Косте Кривцуну и VeterManve — автору идеи.
Реконсиляция — проверка целостности данных в распределенных системах
При разработке и использовании распределенных систем перед нами возникает задача контроля целостности и идентичности данных между системами — задача реконсиляции.
Требования, которые выставляет заказчик — минимальное время данной операции, поскольку чем раньше расхождение будет найдено, тем легче будет устранить его последствия. Задача заметно усложняется тем, что системы находятся в постоянном движении (
100 000 транзакций в час) и добиться 0% расхождений не получится.
Основная идея
Основную идею решения можно описать следующей диаграммой.
Рассмотрим каждый из элементов отдельно.
Адаптеры данных
Каждая из систем создана для своей предметной области и как следствие описания объектов могут существенно различаться. Нам же требуется сравнить только определенный набор полей из этих объектов.
Для упрощения процедуры сравнения, приведем объекты к единому формату, написав для каждого источника данных свой адаптер. Приведение объектов к единому формату, позволяет заметно сократить объем используемой памяти, поскольку хранить мы будем только сравниваемые поля.
Под капотом у адаптера может быть любой источник данных: HttpClient, SqlClient, DynamoDbClient и т.д.
Ниже представлен интерфейс IAdapter, который требуется реализовать:
Хранилище
Реконсиляцию данных можно начинать только после того как все данные прочитаны, поскольку адаптеры могут возвращать их в произвольном порядке.
В таком случае объема оперативной памяти может не хватить, особенно если вы запускаете несколько реконсиляций одновременно, указывая большие временные интервалы.
Рассмотрим интерфейс IStorage
Хранилище. Реализация на базе MS SQL
Мы реализовали IStorage, используя MS SQL, что позволило выполнять сравнение полностью на стороне Db сервера.
Для хранения реконсилируемых значений достаточно создать следующую таблицу:
Каждая запись содержит системные поля ([id], [adapterId]) и поля, по которым осуществляется сравнение ([qty], [price]). Пару слов о системных полях:
[id] — уникальный идентификатор записи, одинаковый в обеих системах
[adapterId] — идентификатор адаптера, через который была получена запись
Так как процессы реконсиляции могут быть запущены параллельно и иметь пересекающиеся интервалы, мы создаем для каждой из них таблицу с уникальным именем. В случае если реконсиляция прошла успешно, данная таблица удаляется, в противном случае отправляется отчет со списком записей, в которых есть расхождения.
Хранилище. Сравнение значений
Представим, что у нас есть 2 множества, элементы которого имеют абсолютно идентичный набор полей. Рассмотрим 4 возможных случая их пересечения:
A. Элементы присутствуют только в левом множестве
B. Элементы присутствуют в обоих множествах, но имеют разные значения
С. Элементы присутствуют только в правом множестве
D. Элементы присутствуют в обоих множествах и имеют одинаковые значения
В конкретной задаче нам требуется найти элементы, описанные в случаях A, B, C. Получить требуемый результат можно за один запрос к MS SQL через FULL OUTER JOIN:
Вывод данного запроса может содержать 4 вида записей, отвечающих исходным требованиям
#idadapteridcomment1guid1adp1Запись присутствует только в левом множестве. Случай A2guid2adp2Запись присутствует только в правом множестве. Случай С3guid3adp1Записи присутствует в обоих множествах, но имеют разные значения. Случай B4guid3adp2Записи присутствует в обоих множествах, но имеют разные значения. Случай B
Хранилище. Хэширование
Используя хэширование по сравниваемым объектам, можно значительно удешевить операции записи и сравнения. Особенно, когда заходит речь о сравнении десятков полей.
Наиболее универсальным оказался способ хэширования сериализованного представления объекта.
Рассмотрим структуру таблицы, используемую при данной оптимизации:
Преимущество такой структуры — это константная стоимость хранения одной записи (24 байта), которая не будет зависеть от числа сравниваемых полей.
Естественно и процедура сравнения претерпевает свои изменения и становится значительно проще.
Процессор
В данном разделе пойдет речь о классе, содержащем всю бизнес-логику реконсиляции, а именно:
Более комплексное описание процесса реконсиляции можно получить, рассмотрев диаграмму последовательностей и интерфейс IProcessor.
СОДЕРЖАНИЕ
Определение
Следующие два определения даны в Оксфордском словаре бухгалтерского учета.
ii) Процедура подтверждения надежности бухгалтерских записей компании путем регулярного сравнения [сальдо операций]. Сверка счетов может производиться ежедневно, ежемесячно или ежегодно ».
Бухгалтерское программное обеспечение является одним из инструментов, которые организации используют для выполнения этого процесса, таким образом устраняя ошибки и, следовательно, принимая точные решения на основе финансовой информации. Сверка счетов определяет, находятся ли транзакции в правильном месте или их следует перенести на другой счет.
Сверка в бухгалтерском учете важна не только для предприятий, но также может быть удобна для домашних хозяйств и частных лиц. Например, разумно проводить сверку счетов по кредитным картам и чековых книжек на регулярной основе. Это делается путем сравнения квитанций по дебетовым картам или копий чеков с выписками из банковского счета физических лиц.
Методы
Поэтому для обеспечения надежности финансовых отчетов выверки должны выполняться для всех балансовых счетов на регулярной и постоянной основе. Надежный процесс согласования повышает точность функции финансовой отчетности и позволяет финансовому отделу с уверенностью публиковать финансовые отчеты.
Примирение может происходить двумя способами:
В обоих случаях, когда ошибки выявляются в результате сверки, необходимо внести корректировки, чтобы баланс счета соответствовал подтверждающей информации.
Текущая практика
Ручное согласование с автоматизацией
Multiprocessing и реконсиляция данных из различных источников
В условиях многообразия распределенных систем, наличие выверенной информации в целевом хранилище является важным критерием непротиворечивости данных.
На этот счет существует немало подходов и методик, а мы остановимся на реконсиляции, теоретические аспекты которой были затронуты вот в этой статье. Предлагаю рассмотреть практическую реализацию данной системы, масштабируемой и адаптированной под большой объем данных.
Как реализовать этот кейс на старом-добром Python — читаем под катом! Поехали!
Введение
Давайте представим, что финансовая организация имеет несколько распределенных систем и перед нами стоит задача сверить транзакции в этих системах и загрузить сверенные данные в целевое хранилище.
В качестве источника данных возьмем большой текстовый файл и таблицу в базе данных PostgreSQL. Предположим, что данные, находящиеся в этих источниках, имеют одни и те же транзакции, но при этом могут иметь различия, и поэтому их необходимо сверить и записать выверенные данные в конечное хранилище для анализа.
Дополнительно необходимо предусмотреть параллельный запуск нескольких реконсиляций на одной базе данных и адаптировать систему под большой объем, применив multiprocessing.
Модуль multiprocessing отлично подходит для распараллеливания операций в Python и позволяет в некотором смысле обходить определенные недостатки GIL. Возможностями данной библиотеки воспользуемся далее.
Архитектура разрабатываемой системы
Начальная подготовка
В качестве инструмента хранения данных будем использовать БД PostgreSQL в Docker-контейнере и взаимодействовать с нашей базой данных через pgAdmin, запущенном в контейнере:
После того, как все запустилось, не забудем указать в конфигурационном файле (conf/db.ini) строку подключения к БД (для учебного примера так можно!):
В принципе, использование контейнера не является обязательным и Вы можете использовать свой сервер БД.
Генерация входных данных
За генерацию тестовых данных отвечает Python-скрипт generate_test_data, который принимает на вход желаемое количество записей для генерации. Последовательность операции легко проследить по основной функции класса GenerateTestData:
Итак, функция выполняет следующие шаги:
Генерация тестового набора данных и последующая запись в текстовой файл формате CSV происходит следующим образом:
Особенность данной функции — запуск в нескольких распараллеленных асинхронных процессах, каждый из которых генерит свою порцию из 50К записей. Эта «фишка» позволит формировать файл на несколько миллионов строк достаточно быстро
После того, как завершится заполнение текстового файла, отрабатывается команда bulk_insert и все данные из этого файла попадают в таблицу transaction_db_raw.transaction_log.
Далее, в двух источниках будут содержаться совершенно одинаковые данные и реконсиляция не найдет ничего интересного, поэтому удаляем и изменяем несколько случайных строк в базе данных.
Запускаем скрипт и генерим тестовый CSV-файл с транзакциями на 10К строк:
На скриншоте видно, что получен файл на 10К строк, в БД загружено 10К, но затем из базы данных было удалено 112 строк и изменено еще 108. Итог: файл и таблица в БД отличаются между собой на 220 записей.
«Ну и где тут мультипроцессинг?», — спросите вы.
А его работу можно увидеть, когда будете генерить файл побольше, не на 10К записей, а, к примеру, на 1M. Попробуем?
После загрузки данных, удаления и изменения случайных записей, видим отличия текстового файла от таблицы: 19 939 строки (из них 10 022 удалено случайным образом, а 9 917 изменено).
На картинке видно, что генерация записей шла асинхронно, непоследовательно. Это означает, что следующий процесс может начаться без учета порядка запуска как только предыдущий завершится. Нет гарантии, что результат окажется в том же порядке, что и входные данные.
Один миллион строк не на самой быстрой виртуальной машине был «придуман» за 15.5 секунд — и это достойный вариант. Запустив эту же генерацию последовательно, без использования мультипроцессинга, я получил результат: генерация файла шла медленнее более чем в три раза (свыше 52 секунд вместо 15,5):
Адаптер для CSV
Этот адаптер хэширует строку, оставляя без изменения только первый столбец – идентификатор транзакции и сохраняет полученные данные в файл data/ transaction_hashed.csv. Финальным шагом его работы является загрузка этого файла при помощи команды COPY во временную таблицу схемы reconciliation_db.
Оптимальное чтение файла выполняется несколькими параллельными процессами. Читаем построчно, кусками по 5 мегабайт каждый. Цифра «5 мегабайт» была получена эмпирическим методом. Именно при таком размере одного фрагмента текста, удалось получить наименьшее время чтения больших файлов на своей виртуальной машине. Можно поэкспериментировать на своем окружении с данным параметром и посмотреть, как будет меняться время работы:
Пример чтения созданного ранее файла на 1М записей:
Использование временной таблицы с уникальным именем для каждого процесса реконсиляции позволяет дополнительно распараллеливать процесс сверки в одной базе данных.
Адаптер для PostgreSQL
Адаптер для обработки данных, хранящихся в таблице работает примерно по той же логике, что и адаптер для файла:
Исходя из размеров таблицы, происходит вычисление количества процессов, необходимых для обработки и внутри каждого процесса идет деление на 10 задач.
Поиск расхождений
Переходим к сверке данных, полученных от двух адаптеров.
Сверка (или получение отчета о расхождениях) происходит на стороне сервера баз данных, используя всю мощь языка SQL.
SQL-запрос совсем нехитрый – это всего лишь джойн таблицы с данными от адаптеров самой на себя по идентификатору транзакции:
На выходе получаем отчет:
Проверим, все ли правильно на картинке выше. Мы помним, что из таблицы в БД было удалено 9917 и изменено 10 022 строк. Итого 19939 строк, что и видно в отчете.
Итоговая таблица
Осталось только вставить в таблицу-хранилище «чистые» транзакции, которые совпадают по всем параметрам (по хэшу) в разных адаптерах. Этот процесс выполним следующим SQL-запросом:
Временную таблицу, которую мы использовали в качестве промежуточного хранения данных от адаптеров, можно удалить.
Заключение
В ходе проделанной работы была разработана система реконсиляции данных из разных источников: текстовый файл и таблица в базе данных. Использовали минимум дополнительных инструментов.
Возможно, искушенный читатель может заметить, что использование фреймворков типа Apache Spark вкупе с приведением исходных данных к паркетному формату, может значительно ускорить данный процесс, особенно для огромных объемов. Но основная цель данной работы – написание системы на «голом» Python и изучение мультипроцессинговой обработки данных. С чем мы, на мой взгляд, справились.
Исходный код всего проекта лежит в моем репозитории на GitHub, предлагаю с ним ознакомиться.
С удовольствием отвечу на все вопросы и ознакомлюсь с вашими замечаниями.