IT story

파이썬에서 10 만 건의 HTTP 요청을 보내는 가장 빠른 방법은 무엇입니까?

hot-time 2020. 4. 7. 08:33
반응형

파이썬에서 10 만 건의 HTTP 요청을 보내는 가장 빠른 방법은 무엇입니까?


URL이 100,000 개인 파일을 열었습니다. 각 URL에 HTTP 요청을 보내고 상태 코드를 인쇄해야합니다. 저는 Python 2.6을 사용하고 있으며 지금까지 Python이 스레딩 / 동시성을 구현하는 많은 혼란스러운 방법을 살펴 보았습니다. 나는 파이썬 동시성 라이브러리를 보았지만이 프로그램을 올바르게 작성하는 방법을 알 수 없습니다. 누구나 비슷한 문제를 겪었습니까? 필자는 일반적으로 가능한 빨리 파이썬에서 수천 개의 작업을 수행하는 방법을 알아야한다고 생각합니다. '동시'를 의미한다고 생각합니다.


트위스트리스 솔루션 :

from urlparse import urlparse
from threading import Thread
import httplib, sys
from Queue import Queue

concurrent = 200

def doWork():
    while True:
        url = q.get()
        status, url = getStatus(url)
        doSomethingWithResult(status, url)
        q.task_done()

def getStatus(ourl):
    try:
        url = urlparse(ourl)
        conn = httplib.HTTPConnection(url.netloc)   
        conn.request("HEAD", url.path)
        res = conn.getresponse()
        return res.status, ourl
    except:
        return "error", ourl

def doSomethingWithResult(status, url):
    print status, url

q = Queue(concurrent * 2)
for i in range(concurrent):
    t = Thread(target=doWork)
    t.daemon = True
    t.start()
try:
    for url in open('urllist.txt'):
        q.put(url.strip())
    q.join()
except KeyboardInterrupt:
    sys.exit(1)

이것은 꼬인 솔루션보다 약간 빠르며 CPU를 덜 사용합니다.


토네이도 비동기 네트워킹 라이브러리를 사용하는 솔루션

from tornado import ioloop, httpclient

i = 0

def handle_request(response):
    print(response.code)
    global i
    i -= 1
    if i == 0:
        ioloop.IOLoop.instance().stop()

http_client = httpclient.AsyncHTTPClient()
for url in open('urls.txt'):
    i += 1
    http_client.fetch(url.strip(), handle_request, method='HEAD')
ioloop.IOLoop.instance().start()

스레드는 절대 답이 아닙니다. 전체 목표가 "가장 빠른 방법"인 경우 허용되지 않는 처리량 제한뿐만 아니라 프로세스 및 커널 병목 현상을 모두 제공합니다.

약간 twisted의 비동기 HTTP클라이언트는 훨씬 나은 결과를 제공합니다.


2010 년 이후이 게시물이 게시되어 다른 답변을 모두 시도하지는 않았지만 몇 가지를 시도했지만 python3.6을 사용하여 가장 잘 작동하는 것으로 나타났습니다.

AWS에서 실행되는 초당 약 150 개의 고유 도메인을 가져올 수있었습니다.

import pandas as pd
import concurrent.futures
import requests
import time

out = []
CONNECTIONS = 100
TIMEOUT = 5

tlds = open('../data/sample_1k.txt').read().splitlines()
urls = ['http://{}'.format(x) for x in tlds[1:]]

def load_url(url, timeout):
    ans = requests.head(url, timeout=timeout)
    return ans.status_code

with concurrent.futures.ThreadPoolExecutor(max_workers=CONNECTIONS) as executor:
    future_to_url = (executor.submit(load_url, url, TIMEOUT) for url in urls)
    time1 = time.time()
    for future in concurrent.futures.as_completed(future_to_url):
        try:
            data = future.result()
        except Exception as exc:
            data = str(type(exc))
        finally:
            out.append(data)

            print(str(len(out)),end="\r")

    time2 = time.time()

print(f'Took {time2-time1:.2f} s')
print(pd.Series(out).value_counts())

grequests를 사용하십시오 . 그것은 request + Gevent 모듈의 조합입니다.

GRequests를 사용하면 Gevent에 요청을 사용하여 비동기 HTTP 요청을 쉽게 만들 수 있습니다.

사용법은 간단합니다.

import grequests

urls = [
   'http://www.heroku.com',
   'http://tablib.org',
   'http://httpbin.org',
   'http://python-requests.org',
   'http://kennethreitz.com'
]

보내지 않은 요청 세트를 작성하십시오.

>>> rs = (grequests.get(u) for u in urls)

동시에 모두 보내기 :

>>> grequests.map(rs)
[<Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>]

이 문제를 해결하는 좋은 방법은 먼저 하나의 결과를 얻는 데 필요한 코드를 작성한 다음 스레딩 코드를 통합하여 응용 프로그램을 병렬화하는 것입니다.

완벽한 세계에서 이것은 단순히 나중에 처리하기 위해 사전 또는 목록으로 결과를 출력하는 10 만 개의 스레드를 동시에 시작한다는 것을 의미하지만 실제로이 방식으로 발행 할 수있는 병렬 HTTP 요청 수는 제한되어 있습니다. 로컬에서는 동시에 열 수있는 소켓 수, 파이썬 인터프리터가 허용하는 실행 스레드 수에 제한이 있습니다. 원격으로 모든 요청이 한 서버 또는 여러 서버에 대한 경우 동시 연결 수가 제한 될 수 있습니다. 이러한 제한 사항으로 인해 한 번에 적은 양의 URL 만 폴링하는 방식으로 스크립트를 작성해야 할 수도 있습니다 (다른 포스터에서 언급했듯이 100은 적절한 스레드 풀 크기 일 수 있음). 더 많은 것을 성공적으로 배포 할 수 있습니다).

이 디자인 패턴에 따라 위의 문제를 해결할 수 있습니다.

  1. 현재 실행중인 스레드 수 (threading.active_count ()를 통해 또는 스레드 객체를 데이터 구조로 푸시하여 스레드를 추적 할 수 있음)가 최대 동시 요청 수 (예 : 100)가 될 때까지 새 요청 스레드를 시작하는 스레드를 시작합니다. 그런 다음 짧은 시간 동안 절전 모드로 전환됩니다. 처리 할 URL이 더 이상 없으면이 스레드가 종료되어야합니다. 따라서 스레드는 계속 깨어나고 새 스레드를 시작하며 완료 될 때까지 대기합니다.
  2. 요청 스레드가 결과를 나중에 검색 및 출력 할 수 있도록 일부 데이터 구조에 저장하도록하십시오. 결과를 저장하는 구조가 list또는 dictCPython 인 경우 잠금없이 스레드에서 고유 항목을 안전하게 추가하거나 삽입 할 수 있지만 파일에 쓰거나보다 복잡한 크로스 스레드 데이터 상호 작용 이 필요한 경우 이 상태를 손상으로부터 보호하기위한 상호 배제 잠금 .

스레딩 모듈 을 사용하는 것이 좋습니다 . 이를 사용하여 실행중인 스레드를 시작하고 추적 할 수 있습니다. 파이썬의 스레딩 지원은 거의 없지만 문제에 대한 설명은 귀하의 요구에 충분하다는 것을 암시합니다.

마지막으로, 파이썬으로 작성된 병렬 네트워크 애플리케이션의 간단한 애플리케이션을 보려면 ssh.py를 확인하십시오 . Python 스레딩을 사용하여 많은 SSH 연결을 병렬화하는 작은 라이브러리입니다. 디자인은 요구 사항에 충분히 근접하여 좋은 리소스가 될 수 있습니다.


최상의 성능을 얻으려면 스레드 대신 비동기 I / O를 사용하는 것이 좋습니다. 수천 개의 OS 스레드와 관련된 오버 헤드는 사소한 것이 아니며 Python 인터프리터 내의 컨텍스트 전환은 그 위에 훨씬 더 추가됩니다. 스레딩은 확실히 작업을 수행하지만 비동기 경로가 더 나은 전반적인 성능을 제공 할 것으로 생각합니다.

특히, Twisted 라이브러리 ( http://www.twistedmatrix.com ) 의 비동기 웹 클라이언트를 제안합니다 . 그것은 확실히 가파른 학습 곡선을 가지고 있지만 Twisted의 비동기 프로그래밍 스타일을 잘 다루면 사용하기가 쉽습니다.

Twisted의 비동기 웹 클라이언트 API에 대한 HowTo는 다음에서 구할 수 있습니다.

http://twistedmatrix.com/documents/current/web/howto/client.html


해결책 :

from twisted.internet import reactor, threads
from urlparse import urlparse
import httplib
import itertools


concurrent = 200
finished=itertools.count(1)
reactor.suggestThreadPoolSize(concurrent)

def getStatus(ourl):
    url = urlparse(ourl)
    conn = httplib.HTTPConnection(url.netloc)   
    conn.request("HEAD", url.path)
    res = conn.getresponse()
    return res.status

def processResponse(response,url):
    print response, url
    processedOne()

def processError(error,url):
    print "error", url#, error
    processedOne()

def processedOne():
    if finished.next()==added:
        reactor.stop()

def addTask(url):
    req = threads.deferToThread(getStatus, url)
    req.addCallback(processResponse, url)
    req.addErrback(processError, url)   

added=0
for url in open('urllist.txt'):
    added+=1
    addTask(url.strip())

try:
    reactor.run()
except KeyboardInterrupt:
    reactor.stop()

시험 시간 :

[kalmi@ubi1:~] wc -l urllist.txt
10000 urllist.txt
[kalmi@ubi1:~] time python f.py > /dev/null 

real    1m10.682s
user    0m16.020s
sys 0m10.330s
[kalmi@ubi1:~] head -n 6 urllist.txt
http://www.google.com
http://www.bix.hu
http://www.godaddy.com
http://www.google.com
http://www.bix.hu
http://www.godaddy.com
[kalmi@ubi1:~] python f.py | head -n 6
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu

핑 타임 :

bix.hu is ~10 ms away from me
godaddy.com: ~170 ms
google.com: ~30 ms

나는 이것이 오래된 질문이라는 것을 알고 있지만 Python 3.7에서는 asyncioand를 사용 하여이 작업을 수행 할 수 있습니다 aiohttp.

import asyncio
import aiohttp
from aiohttp import ClientSession, ClientConnectorError

async def fetch_html(url: str, session: ClientSession, **kwargs) -> tuple:
    try:
        resp = await session.request(method="GET", url=url, **kwargs)
    except ClientConnectorError:
        return (url, 404)
    return (url, resp.status)

async def make_requests(urls: set, **kwargs) -> None:
    async with ClientSession() as session:
        tasks = []
        for url in urls:
            tasks.append(
                fetch_html(url=url, session=session, **kwargs)
            )
        results = await asyncio.gather(*tasks)

    for result in results:
        print(f'{result[1]} - {str(result[0])}')

if __name__ == "__main__":
    import pathlib
    import sys

    assert sys.version_info >= (3, 7), "Script requires Python 3.7+."
    here = pathlib.Path(__file__).parent

    with open(here.joinpath("urls.txt")) as infile:
        urls = set(map(str.strip, infile))

    asyncio.run(make_requests(urls=urls))

이에 대한 자세한 내용은 여기 에서 예를 참조 하십시오 .


스레드 풀을 사용하는 것이 좋은 옵션이며 상당히 쉽습니다. 불행히도 파이썬에는 스레드 풀을 매우 쉽게 만드는 표준 라이브러리가 없습니다. 그러나 여기에 시작 해야하는 괜찮은 라이브러리가 있습니다 : http://www.chrisarndt.de/projects/threadpool/

그들의 사이트에서 코드 예제 :

pool = ThreadPool(poolsize)
requests = makeRequests(some_callable, list_of_args, callback)
[pool.putRequest(req) for req in requests]
pool.wait()

도움이 되었기를 바랍니다.


만들기 epoll개체,
오픈 많은 클라이언트 TCP 소켓을
요청 헤더보다 조금 더 될 자신의 전송 버퍼를 조정
요청 헤더를 보낼 - 그것은 단지 버퍼에 배치, 즉시 수에 소켓을 등록해야 epoll객체,
수행 .pollepollobect,
처음 3 읽기 바이트에서 각각의 소켓에서 .poll,
에 기록 sys.stdout다음 \n(플러시하지 않습니다), 가까운 클라이언트 소켓.

동시에 열린 소켓 수 제한 — 소켓을 만들 때 오류를 처리합니다. 다른 소켓이 닫힌 경우에만 새 소켓을 작성하십시오.
OS 한계를 조정하십시오.
몇 개 (많지 않은) 프로세스로 분기 해보십시오. 이렇게하면 CPU를 좀 더 효과적으로 사용할 수 있습니다.


귀하의 경우, 스레딩은 아마도 응답을 기다리는 데 대부분의 시간을 소비 할 것이기 때문에 아마도 트릭을 할 것입니다. 표준 라이브러리에는 Queue 와 같은 유용한 모듈 이 있습니다.

나는 파일을 병렬로 다운로드하는 것과 비슷한 일을했고 나에게 충분했지만 당신이 말하는 규모에는 맞지 않았다.

작업이 더 많은 CPU 바운드 인 경우 더 많은 CPU / 코어 / 스레드를 사용할 수 있는 멀티 프로세싱 모듈을 살펴볼 수 있습니다 (프로세스 당 잠금이므로 서로 차단하지 않는 더 많은 프로세스)


사용을 고려 풍차 풍차 아마 많은 스레드를 수행하지 못할지라도.

5 개의 머신에서 수동 롤링 된 Python 스크립트를 사용하여이를 수행 할 수 있습니다. 각 머신은 포트 40000-60000을 사용하여 아웃 바운드 연결하여 100,000 개의 포트 연결을 엽니 다.

또한 각 서버가 얼마나 많은 양을 처리 할 수 ​​있는지 알기 위해 OpenSTA 와 같이 훌륭하게 스레드 된 QA 앱으로 샘플 테스트를 수행하는 데 도움이 될 수 있습니다.

또한 LWP :: ConnCache 클래스와 함께 간단한 Perl을 사용하십시오. 그런 식으로 더 많은 성능 (더 많은 연결)을 얻을 수 있습니다.


이 꼬인 비동기 웹 클라이언트는 꽤 빠릅니다.

#!/usr/bin/python2.7

from twisted.internet import reactor
from twisted.internet.defer import Deferred, DeferredList, DeferredLock
from twisted.internet.defer import inlineCallbacks
from twisted.web.client import Agent, HTTPConnectionPool
from twisted.web.http_headers import Headers
from pprint import pprint
from collections import defaultdict
from urlparse import urlparse
from random import randrange
import fileinput

pool = HTTPConnectionPool(reactor)
pool.maxPersistentPerHost = 16
agent = Agent(reactor, pool)
locks = defaultdict(DeferredLock)
codes = {}

def getLock(url, simultaneous = 1):
    return locks[urlparse(url).netloc, randrange(simultaneous)]

@inlineCallbacks
def getMapping(url):
    # Limit ourselves to 4 simultaneous connections per host
    # Tweak this number, but it should be no larger than pool.maxPersistentPerHost 
    lock = getLock(url,4)
    yield lock.acquire()
    try:
        resp = yield agent.request('HEAD', url)
        codes[url] = resp.code
    except Exception as e:
        codes[url] = str(e)
    finally:
        lock.release()


dl = DeferredList(getMapping(url.strip()) for url in fileinput.input())
dl.addCallback(lambda _: reactor.stop())

reactor.run()
pprint(codes)

가장 쉬운 방법은 파이썬의 내장 스레딩 라이브러리를 사용하는 것입니다. 그것들은 "실제"/ 커널 스레드가 아닙니다. (직렬화와 같은) 문제가 있지만 충분합니다. 큐 및 스레드 풀이 필요합니다. 하나의 옵션이 여기 있지만 자신의 것을 작성하는 것은 쉽지 않습니다. 100,000 개의 통화를 모두 병렬 처리 할 수는 없지만 동시에 100 개 정도의 통화를 해제 할 수 있습니다.

참고 URL : https://stackoverflow.com/questions/2632520/what-is-the-fastest-way-to-send-100-000-http-requests-in-python

반응형