Ваша функция - worker(). Распаралелить можно, читая часть файла, и записывая обработанный результат в разные файлы
subprocess_run : запускать скрипт 'sub.py' в новой python сесии, те у кажного процесса собственный GIL, по идее самый быстрый вариант
concurrent_run, multiprocessing_run : чтение файла по частям,
чем больше lines_limit, тем больше процесс за один подход обработает данных, тем быстрее общее время
single_run: чтение файла целиком, однопоточное выполнение (для сравнения)
import re
import subprocess
import multiprocessing
import concurrent.futures
from itertools import islice
import timeit
def worker(args):
"""
читать rfile, начиная со строки begin_line до строки end_line
результат писать в wfile(писать в разные файлы)
"""
rfile, wfile, begin_line, end_line, enc = args
with open(rfile, encoding=enc, errors='ignore') as rf, open(wfile, 'w', encoding=enc, errors='ignore') as wf:
for line in islice(rf, begin_line, end_line):
wf.write('%s\n' % re.sub('\s+', '', line.replace(' ', ';')))
def subprocess_run(file='sub.py'):
"""выполнить (2 процесса), через новую python-сессию"""
m = max_line//2 # обработать строк за проход
with subprocess.Popen(['python', file, str([file_read, file_write % 1, 0, m, enc])]), \
subprocess.Popen(['python', file, str([file_read, file_write % 2, m, max_line, enc])]):
pass
def multiprocessing_run():
"""выполнить через multiprocessing"""
with multiprocessing.Pool(processes=2) as pool:
pool.map(worker, worker_args)
def concurrent_run():
"""выполнить через ProcessPoolExecutor"""
with concurrent.futures.ProcessPoolExecutor(max_workers=2) as pool:
pool.map(worker, worker_args)
def single_run():
"""выполнить однопоточно"""
worker([file_read, file_write % '_', 0, max_line, enc])
if __name__ == '__main__':
# исходные данные
enc = 'utf-8'
file_read = 'test2.txt'
file_write = 'test2_out_%s.txt'
start_line, stop_line = 0, 0
lines_limit = 1500000 # обработать строк за проход
max_line = len(list(open(file_read))) # кол-во строк файла
# для multiprocessing_run, concurrent_run
worker_args = []
while start_line < max_line:
start_line = stop_line
stop_line += lines_limit
args = file_read, file_write % start_line, start_line, stop_line, enc
worker_args.append(args)
# выполнить и замерить производительность
for fn in [subprocess_run, single_run, concurrent_run, multiprocessing_run]:
print(timeit.Timer(fn).repeat(1, 1), 'сек', fn.__name__)
файл 'sub.py'
import sys
import re
from itertools import islice
rfile, wfile, begin_line, end_line, enc = eval(sys.argv[1])
with open(rfile, encoding=enc, errors='ignore') as rf, open(wfile, 'w', encoding=enc, errors='ignore') as wf:
for line in islice(rf, begin_line, end_line):
wf.write('%s\n' % re.sub('\s+', '', line.replace(' ', ';')))
быстродействие, исходный файл 'test2.txt' - 120Mb ~ 3000000 строк :
[16.207453404993398] сек subprocess_run
[29.336879417459787] сек single_run
[18.94762167044125] сек concurrent_run
[20.828409269737875] сек multiprocessing_run
\\tsclientдиски/сеть другой работой загружены или что-то ещё 3- "заменить кодировку" (с utf-8 на utf-8 -- не ясно что вы имеете в виду) -- может вы хотите убедиться, что весь файл можно декодировать? к примеру, что нет lone surrogates? Или что-то другое? 4- "распараллелить процесс" -- или не нужно или бесполезно в зависимости от №2 – jfs Mar 09 '17 at 11:57io.open()укажите cp1251, а при записи utf-8. – jfs Mar 09 '17 at 21:33