꼬리와 비슷한 파이썬으로 파일의 마지막 n 줄 가져 오기
웹 응용 프로그램에 대한 로그 파일 뷰어를 작성 중이며 로그 파일 줄을 통해 페이지 매김하고 싶습니다. 파일의 항목은 맨 아래에있는 최신 항목을 기준으로합니다.
따라서 아래쪽에서 줄을 tail()
읽고 n
오프셋을 지원하는 방법이 필요합니다 . 내가 생각해 낸 것은 다음과 같습니다.
def tail(f, n, offset=0):
"""Reads a n lines from f with an offset of offset lines."""
avg_line_length = 74
to_read = n + offset
while 1:
f.seek(-(avg_line_length * to_read), 2)
except IOError:
# woops. apparently file is smaller than what we want
# to step back, go to the beginning instead
pos = f.tell()
lines = f.read().splitlines()
if len(lines) >= to_read or pos == 0:
return lines[-to_read:offset and -offset or None]
avg_line_length *= 1.3
이것이 합리적인 접근입니까? 오프셋을 사용하여 로그 파일을 테일링하는 권장 방법은 무엇입니까?
이것은 당신보다 빠를 수 있습니다. 줄 길이에 대한 가정을하지 않습니다. 올바른 수의 '\ n'문자를 찾을 때까지 파일을 한 번에 한 블록 씩 백업합니다.
def tail( f, lines=20 ):
total_lines_wanted = lines
f.seek(0, 2)
block_end_byte = f.tell()
lines_to_go = total_lines_wanted
block_number = -1
blocks = [] # blocks of size BLOCK_SIZE, in reverse order starting
# from the end of the file
while lines_to_go > 0 and block_end_byte > 0:
if (block_end_byte - BLOCK_SIZE > 0):
# read the last block we haven't yet read
f.seek(block_number*BLOCK_SIZE, 2)
# file too small, start from begining
# only read what was not read
lines_found = blocks[-1].count('\n')
lines_to_go -= lines_found
block_end_byte -= BLOCK_SIZE
block_number -= 1
all_read_text = ''.join(reversed(blocks))
return '\n'.join(all_read_text.splitlines()[-total_lines_wanted:])
나는 실용적으로-당신이 그런 것을 알 수 없을 때 줄 길이에 대한 까다로운 가정을 좋아하지 않습니다.
일반적으로 루프를 통과하는 첫 번째 또는 두 번째 패스에서 마지막 20 개 라인을 찾습니다. 74 자의 물건이 실제로 정확하면 블록 크기를 2048로 만들고 거의 즉시 20 줄을 꼬리에 붙입니다.
또한 물리적 OS 블록과의 정렬을 세밀하게하려고 많은 뇌 칼로리를 태우지 않습니다. 이러한 고급 I / O 패키지를 사용하면 OS 블록 경계에 맞추려고 시도 할 때 성능이 저하 될 수 있습니다. 낮은 수준의 I / O를 사용하면 속도가 향상 될 수 있습니다.
파이썬 2에서 유닉스 계열 시스템을 가정합니다.
import os
def tail(f, n, offset=0):
stdin,stdout = os.popen2("tail -n "+n+offset+" "+f)
lines = stdout.readlines(); stdout.close()
return lines[:,-offset]
파이썬 3의 경우 다음을 수행 할 수 있습니다.
import subprocess
def tail(f, n, offset=0):
proc = subprocess.Popen(['tail', '-n', n + offset, f], stdout=subprocess.PIPE)
lines = proc.stdout.readlines()
return lines[:, -offset]
전체 파일을 읽을 수 있으면 deque를 사용하십시오.
from collections import deque
deque(f, maxlen=n)
2.6 이전에는 deques에 maxlen 옵션이 없었지만 구현하기가 쉽습니다.
import itertools
def maxque(items, size):
items = iter(items)
q = deque(itertools.islice(items, size))
for item in items:
del q[0]
return q
끝에서 파일을 읽어야하는 경우 갤럽 (일명 지수) 검색을 사용하십시오.
def tail(f, n):
assert n >= 0
pos, lines = n+1, []
while len(lines) <= n:
f.seek(-pos, 2)
except IOError:
lines = list(f)
pos *= 2
return lines[-n:]
여기 내 대답이 있습니다. 순수한 파이썬. timeit을 사용하면 꽤 빠릅니다. 100,000 줄을 가진 로그 파일의 100 줄을 꼬리 :
>>> timeit.timeit('tail.tail(f, 100, 4098)', 'import tail; f = open("log.txt", "r");', number=10)
>>> timeit.timeit('tail.tail(f, 100, 4098)', 'import tail; f = open("log.txt", "r");', number=100)
>>> timeit.timeit('tail.tail(f, 100, 4098)', 'import tail; f = open("log.txt", "r");', number=1000)
>>> timeit.timeit('tail.tail(f, 100, 4098)', 'import tail; f = open("log.txt", "r");', number=10000)
>>> timeit.timeit('tail.tail(f, 100, 4098)', 'import tail; f = open("log.txt", "r");', number=100000)
코드는 다음과 같습니다.
import os
def tail(f, lines=1, _buffer=4098):
"""Tail a file and get X lines from the end"""
# place holder for the lines found
lines_found = []
# block counter will be multiplied by buffer
# to get the block size from the end
block_counter = -1
# loop until we find X lines
while len(lines_found) < lines:
f.seek(block_counter * _buffer, os.SEEK_END)
except IOError: # either file is too small, or too many lines requested
lines_found = f.readlines()
lines_found = f.readlines()
# we found enough lines, get out
# Removed this line because it was redundant the while will catch
# it, I left it for history
# if len(lines_found) > lines:
# break
# decrement the block counter to get the
# next X bytes
block_counter -= 1
return lines_found[-lines:]
위의 S.Lott의 대답은 거의 효과가 있지만 부분적으로 줄을 얻습니다. 데이터가 읽은 블록을 역순으로 보유하기 때문에 블록 경계의 데이터가 손상되는 것으로 나타났습니다. ''.join (data)을 호출하면 블록 순서가 잘못되었습니다. 이 문제를 해결합니다.
def tail(f, window=20):
Returns the last `window` lines of file `f` as a list.
f - a byte file-like object
if window == 0:
return []
BUFSIZ = 1024
f.seek(0, 2)
bytes = f.tell()
size = window + 1
block = -1
data = []
while size > 0 and bytes > 0:
if bytes - BUFSIZ > 0:
# Seek back one whole BUFSIZ
f.seek(block * BUFSIZ, 2)
# read BUFFER
data.insert(0, f.read(BUFSIZ))
# file too small, start from begining
# only read what was not read
data.insert(0, f.read(bytes))
linesFound = data[0].count('\n')
size -= linesFound
bytes -= BUFSIZ
block -= 1
return ''.join(data).splitlines()[-window:]
내가 사용한 코드. 나는 이것이 지금까지 최고라고 생각합니다.
def tail(f, n, offset=None):
"""Reads a n lines from f with an offset of offset lines. The return
value is a tuple in the form ``(lines, has_more)`` where `has_more` is
an indicator that is `True` if there are more lines in the file.
avg_line_length = 74
to_read = n + (offset or 0)
while 1:
f.seek(-(avg_line_length * to_read), 2)
except IOError:
# woops. apparently file is smaller than what we want
# to step back, go to the beginning instead
pos = f.tell()
lines = f.read().splitlines()
if len(lines) >= to_read or pos == 0:
return lines[-to_read:offset and -offset or None], \
len(lines) > to_read or pos > 0
avg_line_length *= 1.3
mmap을 사용한 간단하고 빠른 솔루션 :
import mmap
import os
def tail(filename, n):
"""Returns last n lines from the filename. No exception handling"""
size = os.path.getsize(filename)
with open(filename, "rb") as f:
# for Windows the mmap parameters are different
fm = mmap.mmap(f.fileno(), 0, mmap.MAP_SHARED, mmap.PROT_READ)
for i in xrange(size - 1, -1, -1):
if fm[i] == '\n':
n -= 1
if n == -1:
return fm[i + 1 if i else 0:].splitlines()
삽입하지 않고 추가하고 뒤집는 더 깨끗한 python3 호환 버전 :
def tail(f, window=1):
Returns the last `window` lines of file `f` as a list of bytes.
if window == 0:
return b''
BUFSIZE = 1024
f.seek(0, 2)
end = f.tell()
nlines = window + 1
data = []
while nlines > 0 and end > 0:
i = max(0, end - BUFSIZE)
nread = min(end, BUFSIZE)
chunk = f.read(nread)
nlines -= chunk.count(b'\n')
end -= nread
return b'\n'.join(b''.join(reversed(data)).splitlines()[-window:])
다음과 같이 사용하십시오.
with open(path, 'rb') as f:
last_lines = tail(f, 3).decode('utf-8')
위의 Popen이 최고의 솔루션이라는 것을 알았습니다. 빠르고 더럽고 작동합니다. 유닉스 머신의 파이썬 2.6에서는 다음을 사용했습니다.
def GetLastNLines(self, n, fileName):
Name: Get LastNLines
Description: Gets last n lines using Unix tail
Output: returns last n lines of a file
Keyword argument:
n -- number of last lines to return
filename -- Name of the file you need to tail into
p=subprocess.Popen(['tail','-n',str(n),self.__fileName], stdout=subprocess.PIPE)
return soutput
soutput은 코드의 마지막 n 줄을 포함합니다. soutput을 통해 한 줄씩 반복하려면 :
for line in GetLastNLines(50,'myfile.log').split('\n'):
print line
@papercrane 솔루션을 python3으로 업데이트하십시오. 와 함께 파일을 엽니 다 open(filename, 'rb')
def tail(f, window=20):
"""Returns the last `window` lines of file `f` as a list.
if window == 0:
return []
BUFSIZ = 1024
f.seek(0, 2)
remaining_bytes = f.tell()
size = window + 1
block = -1
data = []
while size > 0 and remaining_bytes > 0:
if remaining_bytes - BUFSIZ > 0:
# Seek back one whole BUFSIZ
f.seek(block * BUFSIZ, 2)
# read BUFFER
bunch = f.read(BUFSIZ)
# file too small, start from beginning
f.seek(0, 0)
# only read what was not read
bunch = f.read(remaining_bytes)
bunch = bunch.decode('utf-8')
data.insert(0, bunch)
size -= bunch.count('\n')
remaining_bytes -= BUFSIZ
block -= 1
return ''.join(data).splitlines()[-window:]
같은 기술이 파일의 마지막 줄을 변경하는 데 사용 된 유사한 질문 에 대한 내 대답에 대한 의견을 제시 한 사람에게 대답을 게시하는 것입니다.
크기가 큰 파일의 mmap
경우 가장 좋은 방법입니다. 기존 mmap
답변 을 개선하기 위해이 버전은 Windows와 Linux 사이에서 이식 가능하며 더 빠르게 실행됩니다 (GB 범위의 파일로 32 비트 Python에서 수정하지 않으면 작동하지 않지만 다른 처리 방법에 대한 힌트 는 다른 답변을 참조하십시오) , 그리고 파이썬 2에서 작동하도록 수정하기 위해 ).
import io # Gets consistent version of open for both Py2.7 and Py3.x
import itertools
import mmap
def skip_back_lines(mm, numlines, startidx):
'''Factored out to simplify handling of n and offset'''
for _ in itertools.repeat(None, numlines):
startidx = mm.rfind(b'\n', 0, startidx)
if startidx < 0:
return startidx
def tail(f, n, offset=0):
# Reopen file in binary mode
with io.open(f.name, 'rb') as binf, mmap.mmap(binf.fileno(), 0, access=mmap.ACCESS_READ) as mm:
# len(mm) - 1 handles files ending w/newline by getting the prior line
startofline = skip_back_lines(mm, offset, len(mm) - 1)
if startofline < 0:
return [] # Offset lines consumed whole file, nothing to return
# If using a generator function (yield-ing, see below),
# this should be a plain return, no empty list
endoflines = startofline + 1 # Slice end to omit offset lines
# Find start of lines to capture (add 1 to move from newline to beginning of following line)
startofline = skip_back_lines(mm, n, startofline) + 1
# Passing True to splitlines makes it return the list of lines without
# removing the trailing newline (if any), so list mimics f.readlines()
return mm[startofline:endoflines].splitlines(True)
# If Windows style \r\n newlines need to be normalized to \n, and input
# is ASCII compatible, can normalize newlines with:
# return mm[startofline:endoflines].replace(os.linesep.encode('ascii'), b'\n').splitlines(True)
이것은 꼬리가있는 줄의 수가 작아서 한 번에 메모리로 안전하게 읽을 수있을 정도로 작다고 가정합니다. 마지막 줄을 다음과 같이 바꾸어 생성기 함수로 만들고 한 번에 한 줄씩 수동으로 읽을 수도 있습니다.
# Call mm.readline n times, or until EOF, whichever comes first
# Python 3.2 and earlier:
for line in itertools.islice(iter(mm.readline, b''), n):
yield line
# 3.3+:
yield from itertools.islice(iter(mm.readline, b''), n)
마지막으로,이 (사용에 필요한 바이너리 모드로 읽기 mmap
가 제공되도록) str
라인 (Py2)과 bytes
선 (Py3를); unicode
(Py2) 또는 str
(Py3) 을 원하는 경우 반복 접근 방식을 조정하여 사용자를 위해 디코딩하거나 줄 바꿈을 수정할 수 있습니다.
lines = itertools.islice(iter(mm.readline, b''), n)
if f.encoding: # Decode if the passed file was opened with a specific encoding
lines = (line.decode(f.encoding) for line in lines)
if 'b' not in f.mode: # Fix line breaks if passed file opened in text mode
lines = (line.replace(os.linesep, '\n') for line in lines)
# Python 3.2 and earlier:
for line in lines:
yield line
# 3.3+:
yield from lines
참고 : 테스트 할 Python에 액세스 할 수없는 컴퓨터 에서이 모든 것을 입력했습니다. 내가 무엇인가를 입력하면 알려주십시오. 이것은 내가 작동해야 한다고 생각 하는 다른 대답 과 충분히 비슷 하지만 조정 (예 : 처리 )으로 미묘한 오류가 발생할 수 있습니다. 실수가 있으면 의견에 알려주십시오.offset
S.Lott의 최고 투표 답변 (Sep 25 '08 at 21:43)을 기반으로하지만 작은 파일로 수정되었습니다.
def tail(the_file, lines_2find=20):
the_file.seek(0, 2) #go to end of file
bytes_in_file = the_file.tell()
lines_found, total_bytes_scanned = 0, 0
while lines_2find+1 > lines_found and bytes_in_file > total_bytes_scanned:
byte_block = min(1024, bytes_in_file-total_bytes_scanned)
the_file.seek(-(byte_block+total_bytes_scanned), 2)
total_bytes_scanned += byte_block
lines_found += the_file.read(1024).count('\n')
the_file.seek(-total_bytes_scanned, 2)
line_list = list(the_file.readlines())
return line_list[-lines_2find:]
#we read at least 21 line breaks from the bottom, block by block for speed
#21 to ensure we don't get a half line
이것이 유용하기를 바랍니다.
pip를 사용하여 설치할 수있는 pypi에 tail의 일부 기존 구현이 있습니다.
- mtFileUtil
- 멀티 테일
- log4tailer
- ...
상황에 따라 이러한 기존 도구 중 하나를 사용하면 이점이있을 수 있습니다.
다음은 매우 간단한 구현입니다.
with open('/etc/passwd', 'r') as f:
s = ''
while s.count('\n') < 11:
cur = f.tell()
f.seek((cur - 10))
s = f.read(10) + s
f.seek((cur - 10))
print s
except Exception as e:
단순 :
with open("test.txt") as f:
data = f.readlines()
tail = data[-2:]
매우 큰 파일을 효율적으로 사용하려면 (꼬리를 사용하려는 로그 파일 상황에서 일반적으로) 전체 파일을 한 번에 메모리로 읽지 않고 수행하더라도 전체 파일을 읽지 않는 것이 좋습니다. 어떻게 든 문자가 아닌 라인에서 오프셋을 해결해야합니다. 한 가지 가능성은 seek () char을 char로 뒤로 읽는 것이지만 매우 느립니다. 대신 더 큰 블록으로 처리하는 것이 좋습니다.
나는 여기에서 사용할 수있는 파일을 뒤로 읽기 위해 얼마 전에 작성한 유틸리티 기능을 가지고 있습니다.
import os, itertools
def rblocks(f, blocksize=4096):
"""Read file as series of blocks from end of file to start.
The data itself is in normal order, only the order of the blocks is reversed.
ie. "hello world" -> ["ld","wor", "lo ", "hel"]
Note that the file must be opened in binary mode.
if 'b' not in f.mode.lower():
raise Exception("File must be opened using binary mode.")
size = os.stat(f.name).st_size
fullblocks, lastblock = divmod(size, blocksize)
# The first(end of file) block will be short, since this leaves
# the rest aligned on a blocksize boundary. This may be more
# efficient than having the last (first in file) block be short
yield f.read(lastblock)
for i in range(fullblocks-1,-1, -1):
f.seek(i * blocksize)
yield f.read(blocksize)
def tail(f, nlines):
buf = ''
result = []
for block in rblocks(f):
buf = block + buf
lines = buf.splitlines()
# Return all lines except the first (since may be partial)
if lines:
result.extend(lines[1:]) # First line may not be complete
if(len(result) >= nlines):
return result[-nlines:]
buf = lines[0]
return ([buf]+result)[-nlines:]
for line in tail(f, 20):
print line
[편집] 더 구체적인 버전을 추가했습니다 (두 번 되돌릴 필요가 없습니다)
f.seek (0, 2)를 사용하여 파일 끝으로 이동 한 다음 readline ()을 다음과 같이 바꾸어 줄을 하나씩 읽을 수 있습니다.
def readline_backwards(self, f):
backline = ''
last = ''
while not last == '\n':
backline = last + backline
if f.tell() <= 0:
return backline
f.seek(-1, 1)
last = f.read(1)
f.seek(-1, 1)
backline = last
last = ''
while not last == '\n':
backline = last + backline
if f.tell() <= 0:
return backline
f.seek(-1, 1)
last = f.read(1)
f.seek(-1, 1)
f.seek(1, 1)
return backline
Eyecue 답변 (Jun 10 '10 at 21:28)을 기반으로 :이 클래스는 파일 객체에 head () 및 tail () 메서드를 추가합니다.
class File(file):
def head(self, lines_2find=1):
self.seek(0) #Rewind file
return [self.next() for x in xrange(lines_2find)]
def tail(self, lines_2find=1):
self.seek(0, 2) #go to end of file
bytes_in_file = self.tell()
lines_found, total_bytes_scanned = 0, 0
while (lines_2find+1 > lines_found and
bytes_in_file > total_bytes_scanned):
byte_block = min(1024, bytes_in_file-total_bytes_scanned)
self.seek(-(byte_block+total_bytes_scanned), 2)
total_bytes_scanned += byte_block
lines_found += self.read(1024).count('\n')
self.seek(-total_bytes_scanned, 2)
line_list = list(self.readlines())
return line_list[-lines_2find:]
f = File('path/to/file', 'r')
파일이 \ n으로 끝나지 않거나 첫 번째 줄 전체를 읽지 못하는 경우 이러한 솔루션 중 일부에 문제가 있습니다.
def tail(file, n=1, bs=1024):
f = open(file)
l = 1-f.read(1).count('\n') # If file doesn't end in \n, count it anyway.
B = f.tell()
while n >= l and B > 0:
block = min(bs, B)
B -= block
f.seek(B, 0)
l += f.read(block).count('\n')
f.seek(B, 0)
l = min(l,n) # discard first (incomplete) line if l > n
lines = f.readlines()[-l:]
return lines
다른 해결책
txt 파일이 다음과 같은 경우 : 마우스 뱀 고양이 도마뱀 늑대 개
파이썬에서 배열 인덱싱을 사용 하여이 파일을 되돌릴 수 있습니다 ''
def tail(contents,n):
with open('file.txt') as file:
for i in file.readlines():
for i in contents[:n:-1]:
결과 : 개 늑대 도마뱀 고양이
파일의 마지막 줄에서 특정 값을 읽고이 스레드를 우연히 발견했습니다. 파이썬에서 바퀴를 재발 명하기보다는 / usr / local / bin / get_last_netp로 저장된 작은 쉘 스크립트가 생겼습니다.
#! /bin/bash
tail -n1 /home/leif/projects/transfer/export.log | awk {'print $14'}
그리고 파이썬 프로그램에서 :
from subprocess import check_output
last_netp = int(check_output("/usr/local/bin/get_last_netp"))
deque를 사용하는 첫 번째 예는 아니지만 더 간단한 예입니다. 이것은 일반적인 것입니다. 파일이 아니라 반복 가능한 객체에서 작동합니다.
#!/usr/bin/env python
import sys
import collections
def tail(iterable, N):
deq = collections.deque()
for thing in iterable:
if len(deq) >= N:
for thing in deq:
yield thing
if __name__ == '__main__':
for line in tail(sys.stdin,10):
This is my version of tailf
import sys, time, os
filename = 'path to file'
with open(filename) as f:
size = os.path.getsize(filename)
if size < 1024:
s = size
s = 999
f.seek(-s, 2)
l = f.read()
print l
while True:
line = f.readline()
if not line:
print line
except IOError:
import time
attemps = 600
wait_sec = 5
fname = "YOUR_PATH"
with open(fname, "r") as f:
where = f.tell()
for i in range(attemps):
line = f.readline()
if not line:
print line, # already has newline
import itertools
fname = 'log.txt'
offset = 5
n = 10
with open(fname) as f:
n_last_lines = list(reversed([x for x in itertools.islice(f, None)][-(offset+1):-(offset+n+1):-1]))
abc = "2018-06-16 04:45:18.68"
filename = "abc.txt"
with open(filename) as myFile:
for num, line in enumerate(myFile, 1):
if abc in line:
lastline = num
print "last occurance of work at file is in "+str(lastline)
이를 수행 할 수있는 매우 유용한 모듈 이 있습니다.
from file_read_backwards import FileReadBackwards
with FileReadBackwards("/tmp/file", encoding="utf-8") as frb:
# getting lines by lines starting from the last line up
for l in frb:
A.Coady가 제공 한 답변 업데이트
파이썬 3에서 작동합니다 .
이것은 지수 검색을 사용 N
하며 뒤에서 한 줄만 버퍼링하며 매우 효율적입니다.
import time
import os
import sys
def tail(f, n):
assert n >= 0
pos, lines = n+1, []
# set file pointer to end
f.seek(0, os.SEEK_END)
isFileSmall = False
while len(lines) <= n:
f.seek(f.tell() - pos, os.SEEK_SET)
except ValueError as e:
# lines greater than file seeking size
# seek to start
isFileSmall = True
except IOError:
print("Some problem reading/seeking the file")
lines = f.readlines()
if isFileSmall:
pos *= 2
return lines[-n:]
with open("stream_logs.txt") as f:
다시 생각하면, 이것은 아마도 여기의 다른 것만 큼 빠릅니다.
def tail( f, window=20 ):
lines= ['']*window
count= 0
for l in f:
lines[count%window]= l
count += 1
print lines[count%window:], lines[:count%window]
훨씬 간단합니다. 그리고 좋은 속도로 찢어지는 것 같습니다.
파일의 첫 번째 또는 마지막 N 줄을 찾는 가장 쉬운 방법을 찾았습니다.
파일의 마지막 N 줄 (예 : N = 10)
for ran in range((len(liner)-N),len(liner)):
print liner[ran]
파일의 첫 N 줄 (예 : N = 10)
for ran in range(0,N+1):
print liner[ran]
참고 URL : https://stackoverflow.com/questions/136168/get-last-n-lines-of-a-file-with-python-similar-to-tail
