0

Имеется текстовый файл овер 20гб В нем необходимо изменить кодировку, заменить разделитель для дальнейшей прогрузки в бд. По ходу конвертации появляются ошибки, незнаю че с ними делать, коды символов разные:

#Error: 'charmap' codec can't encode character '\xb9' in position 67: `character` maps to <undefined>, in line 0 str =
#Error: 'charmap' codec can't encode character '\x98' in position 197: character maps to <undefined>, in line 0 str =

И второй вопрос, можно ли как-то распараллелить процесс? может писать в разные файлы? В один поток за рабочий день файл не конвертируется. Но тогда вопрос, как делить исходный файл, читаю про потоки, но пока не пробовал. Исходный скрипт:

import codecs
import time
import re
file = codecs.open('\\\\tsclient\\C\\Users\\User\\Documents\\MDM\\MDM\\H_IDENTIFIER_000000.txt','r', 'utf-8') #добавил
out = open('\\\\tsclient\\C\\Users\\User\\Documents\\MDM\\MDM\\H_IDENTIFIER.txt', 'w')
n = 0
print('Start :'.format(), time.strftime('%c'))
for i in file:
    try:       
        out.write(re.sub('\s+','',i.replace('  ',';')) + '\n')       
    except Exception as e:
        print('Error: {}, in line {} str = '.format(e, n, i))
        pass
out.close()
print('end :'.format(), time.strftime('%c'))
jfs
  • 52,361
ss_beer
  • 751
  • Не вижу в коде указания кодировки, в которую конвертируете-то – andreymal Mar 09 '17 at 08:11
  • Дак сначала же, codecs.open(file,'utf-8') – ss_beer Mar 09 '17 at 09:23
  • это чтение файла с кодировкой utf-8, а запись файла с какой кодировкой? – andreymal Mar 09 '17 at 09:28
  • @andreymal а, сорри, тоже в utf-8, перед replace .encode('utf-8') должно быть – ss_beer Mar 09 '17 at 09:30
  • А версия Python какая? – andreymal Mar 09 '17 at 09:31
  • @andreymal Python 3.4 – ss_beer Mar 09 '17 at 09:34
  • 1
    постарайтесь в одном вопросе ограничиваться одной проблемой: у вас тут несколько проблем: 1- "'charmap' codec can't encode" 2- загрузить 20GB текст. файл в БД за рабочий день (20GB/8hour ~ 6Mbps) весьма умеренная скорость. Без подробностей нельзя понять, то ли \\tsclient диски/сеть другой работой загружены или что-то ещё 3- "заменить кодировку" (с utf-8 на utf-8 -- не ясно что вы имеете в виду) -- может вы хотите убедиться, что весь файл можно декодировать? к примеру, что нет lone surrogates? Или что-то другое? 4- "распараллелить процесс" -- или не нужно или бесполезно в зависимости от №2 – jfs Mar 09 '17 at 11:57
  • 5- заменить разделитель (не ясно что на что точно вы пытаетесь заменить -- известно ли вам про universal newlines mode?) – jfs Mar 09 '17 at 12:01
  • @jfs Да, вы верно подметили, проблем больше :( со всеми столкнулся по-ходу решения данной задачи. С кодировкой, изначально файл cp1251, открываю в utf-8 и пишу тоже в utf-8. Сам файл примерно 25Гб, я работаю на компе по RDP, там оракловый клиент и средства разрабоки, дисковое пространство ограничено, поэтому \tsclient это диск машинки, с которой хожу по RDP. RDP отрубается при длительном бездействии, пробовал оставить на ночь или выхи, отсюда желания про рабочий день. Разделитель заменяю чтобы привести файл к формату csv и прогрузить в БД – ss_beer Mar 09 '17 at 15:11
  • @jfs Файл не стуктурирован, в кач-ве разделителя табуляция и может еще пробелы, поэтому все заменяю на ;. Про universal newlines mode, к сожалению не известно - почитаю. – ss_beer Mar 09 '17 at 15:12
  • @ss_beer: суть в том, что вопрос как сейчас написан -- мало кому кроме вас может пригодиться. Если разбить, его на составляющие практические проблемы, то более вероятно ответы могут ком-то ещё кроме вас пригодятся -- в этом миссия Stack Overflow и состоит¶ Если файл в cp1251, то его следует открывать используя cp1251 кодировку. Если вы укажите utf-8, то либо кракозябры получите либо более вероятно UnicodeDecodeError. Если вы хотите превратить cp1251 файл в utf-8 файл, то при чтении с помощью io.open() укажите cp1251, а при записи utf-8. – jfs Mar 09 '17 at 21:33

2 Answers2

-1

Попробуйте такой код:

import re
import time
import codecs
import multiprocessing as mt
from queue import Empty as queue_Empty

INP_FILENAME = '\\\\tsclient\\C\\Users\\User\\Documents\\MDM\\MDM\\H_IDENTIFIER_000000.txt'
OUT_FILENAME = '\\\\tsclient\\C\\Users\\User\\Documents\\MDM\\MDM\\H_IDENTIFIER.txt'
ERR_FILENAME = '\\\\tsclient\\C\\Users\\User\\Documents\\MDM\\MDM\\H_IDENTIFIER_err.txt'

NUM_THREAD = `8


def read_line(filename, inp_queue):
    with codecs.open(INP_FILENAME,'r', 'utf-8') as f:
        for idx, line in enumerate(f): 
            #DEBUG print((idx, line))
            inp_queue.put((idx, line))
    pass


def processed_line(inp_queue, out_queue):
    while True:
        try:
            idx, line = inp_queue.get(): 
            #DEBUG print((idx, line))
            out_queue.put(re.sub(r'\s+','', line.replace('  ',';')) + '\n')
        except Exception:
            out_queue.put((idx, line))
    pass 


def write_line(out_filename, out_queue, err_filename):
    err_f = open(err_filename, 'w')
    with open(out_filename, 'w') as f:
        while True:
            data = out_queue.get(timeout=10): 
            #DEBUG print(data)
            try:
                f.write(data)
            except Exception:
                if isinstance(data, tuple):
                    err_f.write(str(data))
                else:
                    err_f.write(str(data[0]))
    pass


def main():
    inp_queue = mt.Queue(10000)
    out_queue = mt.Queue(10000)

    lprocess = []
    lprocess.append(mt.Process(target=read_line, args=(INP_FILENAME, inp_queue), daemon=True))
    for _ in range(NUM_THREAD):
        lprocess.append(mt.Process(target=processed_line, args=(inp_queue, out_queue), daemon=True))
    lprocess.append(mt.Process(target=write_line, args=(OUT_FILENAME, out_queue, ERR_FILENAME), daemon=True))

    for process in lprocess:
        process.start()

    lprocess.pop(0).join()
    print('end read')
    lprocess.pop(-1).join()
    print('end :'.format(), time.strftime('%c'))

if __name__ == '__main__':
    main()

P.S. Сюдя по путям вы читаете файл которых находиться на другом компютере, а это может быть очень медленно. Лучше перенесите место запуска или файл.

Andrio Skur
  • 2,873
  • Спасибо. При запуске получаю ошибку: for idx, line in enumerate(): TypeError: Required argument 'iterable' (pos 1) not found Разве можно без аргумента? – ss_beer Mar 09 '17 at 09:25
  • Добавил (f), теперь queue.Empty – ss_beer Mar 09 '17 at 09:36
  • Даже не знаю что сказать. Попробуйте раскоментировать строки #DEBUG – Andrio Skur Mar 09 '17 at 10:09
  • @ss_beer Не думаю что это может работать быстрее. Для каждой строки создается 3 ОТДЕЛЬНЫХ mt.Process() !!!. Те для миллиона строк, всего будет создано 3 милиона Process-объектов. Фактически это работает медленее чем однопоточный вариант. – vadim vaduxa Mar 09 '17 at 11:37
  • @vadimvaduxa Эмм что? Вы читали код? Тут создаеться 1 поток на чтения из файла в очередь inp_queue (read_line), 8 потоков на обработку inp_queue в out_queue(processed_line), и 1 поток на запись в файл (write_line). Где тут создание "3 отдельных mt.Process" на строку – Andrio Skur Mar 09 '17 at 11:53
  • да, дейсвительно. В любом случае при замере, ваш код у меня показывает 30 сек, что не быстрее однопоточного варианта. – vadim vaduxa Mar 09 '17 at 12:08
  • Если у вас Windows система, то вполне вероятно что на маленьких данных время на порождение mt.Process забирает все оптимизированое время. Хотя спорить не буду, может очереди медленно работают – Andrio Skur Mar 09 '17 at 12:14
  • @Andrio Skur Попробуйте привести ваш код в рабочее состояние и замерять время. Для файла 120 Mb - 625 сек. Тот же файл без потоков обрабатывается всего за 32сек, или за 20 сек в моем варианте – vadim vaduxa Mar 09 '17 at 12:31
-1

Ваша функция - worker(). Распаралелить можно, читая часть файла, и записывая обработанный результат в разные файлы

subprocess_run : запускать скрипт 'sub.py' в новой python сесии, те у кажного процесса собственный GIL, по идее самый быстрый вариант

concurrent_run, multiprocessing_run : чтение файла по частям, чем больше lines_limit, тем больше процесс за один подход обработает данных, тем быстрее общее время

single_run: чтение файла целиком, однопоточное выполнение (для сравнения)

import re
import subprocess
import multiprocessing
import concurrent.futures
from itertools import islice
import timeit


def worker(args):
    """
    читать rfile, начиная со строки begin_line до строки end_line
    результат писать в wfile(писать в разные файлы)
    """
    rfile, wfile, begin_line, end_line, enc = args
    with open(rfile, encoding=enc, errors='ignore') as rf, open(wfile, 'w', encoding=enc, errors='ignore') as wf:
        for line in islice(rf, begin_line, end_line):
            wf.write('%s\n' % re.sub('\s+', '', line.replace('  ', ';')))


def subprocess_run(file='sub.py'):
    """выполнить (2 процесса), через новую python-сессию"""
    m = max_line//2    # обработать строк за проход
    with subprocess.Popen(['python', file, str([file_read, file_write % 1, 0, m, enc])]), \
         subprocess.Popen(['python', file, str([file_read, file_write % 2, m, max_line, enc])]):
        pass

def multiprocessing_run():
    """выполнить через multiprocessing"""
    with multiprocessing.Pool(processes=2) as pool:
        pool.map(worker, worker_args)

def concurrent_run():
    """выполнить через ProcessPoolExecutor"""
    with concurrent.futures.ProcessPoolExecutor(max_workers=2) as pool:
        pool.map(worker, worker_args)

def single_run():
    """выполнить однопоточно"""
    worker([file_read, file_write % '_', 0, max_line, enc])


if __name__ == '__main__':
    # исходные данные
    enc = 'utf-8'
    file_read = 'test2.txt'
    file_write = 'test2_out_%s.txt'
    start_line, stop_line = 0, 0
    lines_limit = 1500000  # обработать строк за проход
    max_line = len(list(open(file_read)))  # кол-во строк файла

    # для multiprocessing_run, concurrent_run
    worker_args = []
    while start_line < max_line:
        start_line = stop_line
        stop_line += lines_limit
        args = file_read, file_write % start_line, start_line, stop_line, enc
        worker_args.append(args)

    # выполнить и замерить производительность
    for fn in [subprocess_run, single_run, concurrent_run, multiprocessing_run]:
        print(timeit.Timer(fn).repeat(1, 1), 'сек', fn.__name__)

файл 'sub.py'

import sys
import re
from itertools import islice

rfile, wfile, begin_line, end_line, enc = eval(sys.argv[1])

with open(rfile, encoding=enc, errors='ignore') as rf, open(wfile, 'w', encoding=enc, errors='ignore') as wf:
    for line in islice(rf, begin_line, end_line):
        wf.write('%s\n' % re.sub('\s+', '', line.replace('  ', ';')))

быстродействие, исходный файл 'test2.txt' - 120Mb ~ 3000000 строк :

[16.207453404993398] сек subprocess_run
[29.336879417459787] сек single_run
[18.94762167044125] сек concurrent_run
[20.828409269737875] сек multiprocessing_run
vadim vaduxa
  • 8,897
  • Спасибо, при запуске получаю ошибку на строке, где хотим получить max_line: unicodedecodeerror: charmap codec can't decode byte 0x98 – ss_beer Mar 10 '17 at 04:16
  • Разобрался. Чтобы получить Макс кол-во строк мы все равно должны прочитать этот файл в 25гб, верно? А нет ли метода: читать пока не конец? – ss_beer Mar 10 '17 at 06:06