1

Прошу помочь разобраться, почему не работает COMMIT в Postgres?

Решаю такую задачу. Есть API с постраничной выдачей. Делаю синхронные постраничные запросы по 1000 объектов. Эти объекты асинхронно вставляю в таблицу БД. У меня есть код на Node.js + pg module, который делает это. Скрипт работает с БД под отдельным пользователем. Однако мне непонятно следующее поведение: запускаю скрипт и параллельно через другого пользователя в pgAdmin запросом select count(*) from objects смотрю число объектов, загруженных в таблицу. Оно не растёт, не смотря на то, что в скрипте явно открываю транзакцию и делаю коммиты по её завершению. Мне удаётся увидеть изменения только лишь после того, как скрипт полностью завершил работу. Если прервать выполнение скрипта вручную через ctrl-c, то коммиты, которые уже были сделаны тоже не сохраняются. Хотелось бы понять, почему так происходит, пробовал разные варианты, смотрел логи Postgres, коммиты там делаются, но изменений не видно, пока скрипт не отработает полностью.

Далее код: файл db.js

const Pool = require('pg').Pool
module.exports = Pool

файл index.js

const Pool = require('./db')
const itemsCount = 1000

const apiList = [ 'https://api1.example.com' 'https://api2.example.com' 'https://api3.example.com' ]

const dbOptions = { host: 'localhost', port: 5432, database: 'db_name', user: 'user', password: 'password', max: 20 }

apiList.forEach(api => { const url = api let page = 1 let objects = [] do { objects = getObjects(url, page) // эта функция постранично получает данные из API по 1000 штук. processObjects(objects, api) // эта функция загружает полученные объекты в БД. page++ } while (objects.length === itemsCount)
})

async function processObjects(objects, api) { const pool = new Pool(dbOptions) for (const object of objects) { const ID = object.ID const client = await pool.connect() const selectObjects = await client.query('select * from objects where id=$1', [ID]) if (selectObjects.rows.length === 0) { try { await client.query('BEGIN;') const insertObjects = await client.query(insert into objects ("id", "json", "api", "loaded") VALUES ($1, $2, $3, $4), [ID, object, api, 'now()']) await client.query('COMMIT;') } catch (error) { console.error(error.stack) } } await client.end() client.release() } await pool.end() }

  • Уберите точку с запятой из BEGIN и COMMIT. Так же, мне кажется, что каждую итерацию запрашивать новое соединение с БД не оптимально. Запросите его один раз перед циклом и юзайте его. После завершения уже отпускайте соединение. У вас есть все шансы выбрать все коннекты из пула(если он не большой). На сколько помню, когда вы освобождаете коннект, библиотека и сама БД еще некоторое время его чистят, т.е. он еще остается занят. – SwaD Sep 27 '23 at 09:51
  • forEach синхронный, операции в его колбеке - асинхронные. ни одного await в колбеке. вот - основные причины вашей проблемы – nörbörnën Sep 27 '23 at 09:58
  • @nörbörnën хоть forEach и синхронный, но на работу асинхронных фукнций это не влияет в данном случае, т.к. внутри forEach не требуется дожидаться ответа. forEach просто дергает Х раз функцию, которая работает уже асинхронно и внутри нее все await тоже работать будут. – SwaD Sep 27 '23 at 10:15
  • Наличие/отсутствие точек с запятой не влияет на проблему. По поводу пула, да, оптимально создавать один раз, просто ранее предполагал, что открытие/закрытие пула как-то влияет на коммиты. – Александр Беляков Sep 27 '23 at 14:12

1 Answers1

0

Вот так нужно написать (getObjects - заглушка для примера):

// @ts-check
const { setTimeout } = require('node:timers/promises');
const { randomUUID } = require('node:crypto');
const { Pool } = require('pg');

const pool = new Pool({ host: 'localhost', port: 5432, database: 'db_name', user: 'user', password: 'password', max: 20 });

run().catch(console.error);

async function run() { const itemsCount = 1000;

const apiList = [ 'https://api1.example.com', 'https://api2.example.com', 'https://api3.example.com', ];

const res = await Promise.allSettled( apiList.map(async (url) => { console.log([START] ${url}); await handleApiItem(url, itemsCount); console.log([DONE] ${url}); }), ); res.forEach((x) => { if (x.status === 'rejected') { console.error(x.reason); } });

await pool.end().catch(console.error); }

/**

  • Фyнкция, которая загрузит все данные из endpoint
  • @param {string} url
  • @param {number} [requestedItemsCount]

*/ async function handleApiItem(url, requestedItemsCount = 1000) { let page = 1; while (1) { try { const objects = await getObjects( url, { pageNumber: page, itemsCount: requestedItemsCount } ); if (objects?.length > 0) { await processObjects(objects, url); } if (!objects || objects.length < requestedItemsCount) { break; } page++;
} catch (err) { console.error(err); } } }

/**

  • Эта функция загружает полученные объекты в БД.
  • @param {Awaited<ReturnType<getObjects>>} objects
  • @param {string} api

*/ async function processObjects(objects, api) { const client = await pool.connect();

for (const object of objects) { const ID = object.ID; const selectObjects = await client.query('select id from objects where id=$1', [ID]); if (selectObjects.rows.length === 0) { try { await client.query('BEGIN;'); await client.query( insert into objects (&quot;id&quot;, &quot;json&quot;, &quot;api&quot;, &quot;loaded&quot;) VALUES ($1, $2, $3, $4), [ID, object, api, 'now()'] ); await client.query('COMMIT;') } catch (error) { console.error(error); } } }

client.release(); }

/**

  • Эта функция постранично получает данные из API по 1000 штук.
  • @param {string} url
  • @param {{ pageNumber: number, itemsCount?: number }} options

/ async function getObjects(url, options) { const timeout = Math.trunc(10 Math.random() + 1); await setTimeout(timeout);

return options.pageNumber > 10 && timeout % 5 === 0 ? [] : Array.from( { length: options.itemsCount || 1000 }, () => ({ ID: randomUUID() }) ); }

я бы дал совет в const selectObjects = await client.query('select id from objects where id=$1', [ID]); добавить and api=$2.

nörbörnën
  • 12,192
  • 5
  • 29
  • 40
  • Спасибо, данный ответ помог разобраться. Если переписать код по такому примеру, то коммиты работают корректно и фиксация происходит каждые 1000 объектов, когда освобождается клиент. По крайней мере так наблюдается для стороннего пользователя. Поигрался с кодом и заметил, что если убрать await перед processObjects, то проблема с коммитами воспроизведётся. Изначально не ставил await, т.к. хотел ускорить код. Ведь можно отдать 1000 объектов на загрузку в БД и делать следующий API-запрос, а теперь приходится ждать их полной загрузки в БД. Но это ладно, главное, понятно, как фиксить проблему. – Александр Беляков Sep 27 '23 at 14:10
  • По поводу дополнительного условия в SQL-запросе, согласен, это немного повысит скорость запроса на большой базе. – Александр Беляков Sep 27 '23 at 14:14
  • Await перед processObjects обязателен; хотите тут ускорения - используйте upsert или пакетный insert (и "пакетную" проверку перед ним). По поводу дополнительного условия, оно не для скорости, а для более селективной выборки записи по источнику данных (вы не застрахованы от того что в разных источниках будут записи с совпадающим идентификатором, но если источник данных на деле один - условие не нужно ставить). – nörbörnën Sep 27 '23 at 14:24
  • Подскажите, пожалуйста, как лучше сделать параллельную обработку каждого api ? В реализации выше она выполняется последовательно. Я немного переписал функцию run, но не уверен, что корректно, хотя на тестах запросы делаются параллельно. const listOfPromises = apiList.map(async (url) => { await handlePortal(url) }) const results = [] for (const promise of listOfPromises) { const index = await promise results.push(index) } const res = await Promise.allSettled(results) res.forEach((x) => { if (x.status === 'rejected') { console.error(x.reason) } }) – Александр Беляков Oct 02 '23 at 12:14
  • в ответе все записи apiList обрабатываются параллельно и корректно – nörbörnën Oct 02 '23 at 12:29