파이썬에서 10 만 건의 HTTP 요청을 보내는 가장 빠른 방법은 무엇입니까?
URL이 100,000 개인 파일을 열었습니다. 각 URL에 HTTP 요청을 보내고 상태 코드를 인쇄해야합니다. 저는 Python 2.6을 사용하고 있으며 지금까지 Python이 스레딩 / 동시성을 구현하는 많은 혼란스러운 방법을 살펴 보았습니다. 나는 파이썬 동시성 라이브러리를 보았지만이 프로그램을 올바르게 작성하는 방법을 알 수 없습니다. 누구나 비슷한 문제를 겪었습니까? 필자는 일반적으로 가능한 빨리 파이썬에서 수천 개의 작업을 수행하는 방법을 알아야한다고 생각합니다. '동시'를 의미한다고 생각합니다.
트위스트리스 솔루션 :
from urlparse import urlparse
from threading import Thread
import httplib, sys
from Queue import Queue
concurrent = 200
def doWork():
while True:
url = q.get()
status, url = getStatus(url)
doSomethingWithResult(status, url)
q.task_done()
def getStatus(ourl):
try:
url = urlparse(ourl)
conn = httplib.HTTPConnection(url.netloc)
conn.request("HEAD", url.path)
res = conn.getresponse()
return res.status, ourl
except:
return "error", ourl
def doSomethingWithResult(status, url):
print status, url
q = Queue(concurrent * 2)
for i in range(concurrent):
t = Thread(target=doWork)
t.daemon = True
t.start()
try:
for url in open('urllist.txt'):
q.put(url.strip())
q.join()
except KeyboardInterrupt:
sys.exit(1)
이것은 꼬인 솔루션보다 약간 빠르며 CPU를 덜 사용합니다.
토네이도 비동기 네트워킹 라이브러리를 사용하는 솔루션
from tornado import ioloop, httpclient
i = 0
def handle_request(response):
print(response.code)
global i
i -= 1
if i == 0:
ioloop.IOLoop.instance().stop()
http_client = httpclient.AsyncHTTPClient()
for url in open('urls.txt'):
i += 1
http_client.fetch(url.strip(), handle_request, method='HEAD')
ioloop.IOLoop.instance().start()
스레드는 절대 답이 아닙니다. 전체 목표가 "가장 빠른 방법"인 경우 허용되지 않는 처리량 제한뿐만 아니라 프로세스 및 커널 병목 현상을 모두 제공합니다.
약간 twisted
의 비동기 HTTP
클라이언트는 훨씬 나은 결과를 제공합니다.
2010 년 이후이 게시물이 게시되어 다른 답변을 모두 시도하지는 않았지만 몇 가지를 시도했지만 python3.6을 사용하여 가장 잘 작동하는 것으로 나타났습니다.
AWS에서 실행되는 초당 약 150 개의 고유 도메인을 가져올 수있었습니다.
import pandas as pd
import concurrent.futures
import requests
import time
out = []
CONNECTIONS = 100
TIMEOUT = 5
tlds = open('../data/sample_1k.txt').read().splitlines()
urls = ['http://{}'.format(x) for x in tlds[1:]]
def load_url(url, timeout):
ans = requests.head(url, timeout=timeout)
return ans.status_code
with concurrent.futures.ThreadPoolExecutor(max_workers=CONNECTIONS) as executor:
future_to_url = (executor.submit(load_url, url, TIMEOUT) for url in urls)
time1 = time.time()
for future in concurrent.futures.as_completed(future_to_url):
try:
data = future.result()
except Exception as exc:
data = str(type(exc))
finally:
out.append(data)
print(str(len(out)),end="\r")
time2 = time.time()
print(f'Took {time2-time1:.2f} s')
print(pd.Series(out).value_counts())
grequests를 사용하십시오 . 그것은 request + Gevent 모듈의 조합입니다.
GRequests를 사용하면 Gevent에 요청을 사용하여 비동기 HTTP 요청을 쉽게 만들 수 있습니다.
사용법은 간단합니다.
import grequests
urls = [
'http://www.heroku.com',
'http://tablib.org',
'http://httpbin.org',
'http://python-requests.org',
'http://kennethreitz.com'
]
보내지 않은 요청 세트를 작성하십시오.
>>> rs = (grequests.get(u) for u in urls)
동시에 모두 보내기 :
>>> grequests.map(rs)
[<Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>]
이 문제를 해결하는 좋은 방법은 먼저 하나의 결과를 얻는 데 필요한 코드를 작성한 다음 스레딩 코드를 통합하여 응용 프로그램을 병렬화하는 것입니다.
완벽한 세계에서 이것은 단순히 나중에 처리하기 위해 사전 또는 목록으로 결과를 출력하는 10 만 개의 스레드를 동시에 시작한다는 것을 의미하지만 실제로이 방식으로 발행 할 수있는 병렬 HTTP 요청 수는 제한되어 있습니다. 로컬에서는 동시에 열 수있는 소켓 수, 파이썬 인터프리터가 허용하는 실행 스레드 수에 제한이 있습니다. 원격으로 모든 요청이 한 서버 또는 여러 서버에 대한 경우 동시 연결 수가 제한 될 수 있습니다. 이러한 제한 사항으로 인해 한 번에 적은 양의 URL 만 폴링하는 방식으로 스크립트를 작성해야 할 수도 있습니다 (다른 포스터에서 언급했듯이 100은 적절한 스레드 풀 크기 일 수 있음). 더 많은 것을 성공적으로 배포 할 수 있습니다).
이 디자인 패턴에 따라 위의 문제를 해결할 수 있습니다.
- 현재 실행중인 스레드 수 (threading.active_count ()를 통해 또는 스레드 객체를 데이터 구조로 푸시하여 스레드를 추적 할 수 있음)가 최대 동시 요청 수 (예 : 100)가 될 때까지 새 요청 스레드를 시작하는 스레드를 시작합니다. 그런 다음 짧은 시간 동안 절전 모드로 전환됩니다. 처리 할 URL이 더 이상 없으면이 스레드가 종료되어야합니다. 따라서 스레드는 계속 깨어나고 새 스레드를 시작하며 완료 될 때까지 대기합니다.
- 요청 스레드가 결과를 나중에 검색 및 출력 할 수 있도록 일부 데이터 구조에 저장하도록하십시오. 결과를 저장하는 구조가
list
또는dict
CPython 인 경우 잠금없이 스레드에서 고유 항목을 안전하게 추가하거나 삽입 할 수 있지만 파일에 쓰거나보다 복잡한 크로스 스레드 데이터 상호 작용 이 필요한 경우 이 상태를 손상으로부터 보호하기위한 상호 배제 잠금 .
스레딩 모듈 을 사용하는 것이 좋습니다 . 이를 사용하여 실행중인 스레드를 시작하고 추적 할 수 있습니다. 파이썬의 스레딩 지원은 거의 없지만 문제에 대한 설명은 귀하의 요구에 충분하다는 것을 암시합니다.
마지막으로, 파이썬으로 작성된 병렬 네트워크 애플리케이션의 간단한 애플리케이션을 보려면 ssh.py를 확인하십시오 . Python 스레딩을 사용하여 많은 SSH 연결을 병렬화하는 작은 라이브러리입니다. 디자인은 요구 사항에 충분히 근접하여 좋은 리소스가 될 수 있습니다.
최상의 성능을 얻으려면 스레드 대신 비동기 I / O를 사용하는 것이 좋습니다. 수천 개의 OS 스레드와 관련된 오버 헤드는 사소한 것이 아니며 Python 인터프리터 내의 컨텍스트 전환은 그 위에 훨씬 더 추가됩니다. 스레딩은 확실히 작업을 수행하지만 비동기 경로가 더 나은 전반적인 성능을 제공 할 것으로 생각합니다.
특히, Twisted 라이브러리 ( http://www.twistedmatrix.com ) 의 비동기 웹 클라이언트를 제안합니다 . 그것은 확실히 가파른 학습 곡선을 가지고 있지만 Twisted의 비동기 프로그래밍 스타일을 잘 다루면 사용하기가 쉽습니다.
Twisted의 비동기 웹 클라이언트 API에 대한 HowTo는 다음에서 구할 수 있습니다.
http://twistedmatrix.com/documents/current/web/howto/client.html
해결책 :
from twisted.internet import reactor, threads
from urlparse import urlparse
import httplib
import itertools
concurrent = 200
finished=itertools.count(1)
reactor.suggestThreadPoolSize(concurrent)
def getStatus(ourl):
url = urlparse(ourl)
conn = httplib.HTTPConnection(url.netloc)
conn.request("HEAD", url.path)
res = conn.getresponse()
return res.status
def processResponse(response,url):
print response, url
processedOne()
def processError(error,url):
print "error", url#, error
processedOne()
def processedOne():
if finished.next()==added:
reactor.stop()
def addTask(url):
req = threads.deferToThread(getStatus, url)
req.addCallback(processResponse, url)
req.addErrback(processError, url)
added=0
for url in open('urllist.txt'):
added+=1
addTask(url.strip())
try:
reactor.run()
except KeyboardInterrupt:
reactor.stop()
시험 시간 :
[kalmi@ubi1:~] wc -l urllist.txt
10000 urllist.txt
[kalmi@ubi1:~] time python f.py > /dev/null
real 1m10.682s
user 0m16.020s
sys 0m10.330s
[kalmi@ubi1:~] head -n 6 urllist.txt
http://www.google.com
http://www.bix.hu
http://www.godaddy.com
http://www.google.com
http://www.bix.hu
http://www.godaddy.com
[kalmi@ubi1:~] python f.py | head -n 6
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
핑 타임 :
bix.hu is ~10 ms away from me
godaddy.com: ~170 ms
google.com: ~30 ms
나는 이것이 오래된 질문이라는 것을 알고 있지만 Python 3.7에서는 asyncio
and를 사용 하여이 작업을 수행 할 수 있습니다 aiohttp
.
import asyncio
import aiohttp
from aiohttp import ClientSession, ClientConnectorError
async def fetch_html(url: str, session: ClientSession, **kwargs) -> tuple:
try:
resp = await session.request(method="GET", url=url, **kwargs)
except ClientConnectorError:
return (url, 404)
return (url, resp.status)
async def make_requests(urls: set, **kwargs) -> None:
async with ClientSession() as session:
tasks = []
for url in urls:
tasks.append(
fetch_html(url=url, session=session, **kwargs)
)
results = await asyncio.gather(*tasks)
for result in results:
print(f'{result[1]} - {str(result[0])}')
if __name__ == "__main__":
import pathlib
import sys
assert sys.version_info >= (3, 7), "Script requires Python 3.7+."
here = pathlib.Path(__file__).parent
with open(here.joinpath("urls.txt")) as infile:
urls = set(map(str.strip, infile))
asyncio.run(make_requests(urls=urls))
이에 대한 자세한 내용은 여기 에서 예를 참조 하십시오 .
스레드 풀을 사용하는 것이 좋은 옵션이며 상당히 쉽습니다. 불행히도 파이썬에는 스레드 풀을 매우 쉽게 만드는 표준 라이브러리가 없습니다. 그러나 여기에 시작 해야하는 괜찮은 라이브러리가 있습니다 : http://www.chrisarndt.de/projects/threadpool/
그들의 사이트에서 코드 예제 :
pool = ThreadPool(poolsize)
requests = makeRequests(some_callable, list_of_args, callback)
[pool.putRequest(req) for req in requests]
pool.wait()
도움이 되었기를 바랍니다.
만들기 epoll
개체,
오픈 많은 클라이언트 TCP 소켓을
요청 헤더보다 조금 더 될 자신의 전송 버퍼를 조정
요청 헤더를 보낼 - 그것은 단지 버퍼에 배치, 즉시 수에 소켓을 등록해야 epoll
객체,
수행 .poll
에 epoll
obect,
처음 3 읽기 바이트에서 각각의 소켓에서 .poll
,
에 기록 sys.stdout
다음 \n
(플러시하지 않습니다), 가까운 클라이언트 소켓.
동시에 열린 소켓 수 제한 — 소켓을 만들 때 오류를 처리합니다. 다른 소켓이 닫힌 경우에만 새 소켓을 작성하십시오.
OS 한계를 조정하십시오.
몇 개 (많지 않은) 프로세스로 분기 해보십시오. 이렇게하면 CPU를 좀 더 효과적으로 사용할 수 있습니다.
귀하의 경우, 스레딩은 아마도 응답을 기다리는 데 대부분의 시간을 소비 할 것이기 때문에 아마도 트릭을 할 것입니다. 표준 라이브러리에는 Queue 와 같은 유용한 모듈 이 있습니다.
나는 파일을 병렬로 다운로드하는 것과 비슷한 일을했고 나에게 충분했지만 당신이 말하는 규모에는 맞지 않았다.
작업이 더 많은 CPU 바운드 인 경우 더 많은 CPU / 코어 / 스레드를 사용할 수 있는 멀티 프로세싱 모듈을 살펴볼 수 있습니다 (프로세스 당 잠금이므로 서로 차단하지 않는 더 많은 프로세스)
사용을 고려 풍차 풍차 아마 많은 스레드를 수행하지 못할지라도.
5 개의 머신에서 수동 롤링 된 Python 스크립트를 사용하여이를 수행 할 수 있습니다. 각 머신은 포트 40000-60000을 사용하여 아웃 바운드 연결하여 100,000 개의 포트 연결을 엽니 다.
또한 각 서버가 얼마나 많은 양을 처리 할 수 있는지 알기 위해 OpenSTA 와 같이 훌륭하게 스레드 된 QA 앱으로 샘플 테스트를 수행하는 데 도움이 될 수 있습니다.
또한 LWP :: ConnCache 클래스와 함께 간단한 Perl을 사용하십시오. 그런 식으로 더 많은 성능 (더 많은 연결)을 얻을 수 있습니다.
이 꼬인 비동기 웹 클라이언트는 꽤 빠릅니다.
#!/usr/bin/python2.7
from twisted.internet import reactor
from twisted.internet.defer import Deferred, DeferredList, DeferredLock
from twisted.internet.defer import inlineCallbacks
from twisted.web.client import Agent, HTTPConnectionPool
from twisted.web.http_headers import Headers
from pprint import pprint
from collections import defaultdict
from urlparse import urlparse
from random import randrange
import fileinput
pool = HTTPConnectionPool(reactor)
pool.maxPersistentPerHost = 16
agent = Agent(reactor, pool)
locks = defaultdict(DeferredLock)
codes = {}
def getLock(url, simultaneous = 1):
return locks[urlparse(url).netloc, randrange(simultaneous)]
@inlineCallbacks
def getMapping(url):
# Limit ourselves to 4 simultaneous connections per host
# Tweak this number, but it should be no larger than pool.maxPersistentPerHost
lock = getLock(url,4)
yield lock.acquire()
try:
resp = yield agent.request('HEAD', url)
codes[url] = resp.code
except Exception as e:
codes[url] = str(e)
finally:
lock.release()
dl = DeferredList(getMapping(url.strip()) for url in fileinput.input())
dl.addCallback(lambda _: reactor.stop())
reactor.run()
pprint(codes)
가장 쉬운 방법은 파이썬의 내장 스레딩 라이브러리를 사용하는 것입니다. 그것들은 "실제"/ 커널 스레드가 아닙니다. (직렬화와 같은) 문제가 있지만 충분합니다. 큐 및 스레드 풀이 필요합니다. 하나의 옵션이 여기 있지만 자신의 것을 작성하는 것은 쉽지 않습니다. 100,000 개의 통화를 모두 병렬 처리 할 수는 없지만 동시에 100 개 정도의 통화를 해제 할 수 있습니다.
'IT story' 카테고리의 다른 글
XML 속성 대 XML 요소 (0) | 2020.04.07 |
---|---|
jQuery 날짜 / 시간 선택기 (0) | 2020.04.07 |
"테이블을 다시 작성해야하는 변경 내용 저장 방지"부정적인 영향 (0) | 2020.04.07 |
파이썬을 사용하여 직접 실행 가능한 크로스 플랫폼 GUI 앱을 어떻게 만들 수 있습니까? (0) | 2020.04.07 |
하나의 열에서 SELECT DISTINCT (0) | 2020.04.07 |