From fad6ab856a8c40004ef58e7548f9f6aefe4a9b3a Mon Sep 17 00:00:00 2001 From: surtur Date: Tue, 11 Aug 2020 14:25:00 +0200 Subject: [PATCH] chore: adding metric grabber + celery runner --- app/celery.py | 7 +++++ app/ultrametrics.py | 65 +++++++++++++++++++++++++++++++++++++++++++++ nurun.sh | 5 ++++ 3 files changed, 77 insertions(+) create mode 100644 app/celery.py create mode 100644 app/ultrametrics.py create mode 100644 nurun.sh diff --git a/app/celery.py b/app/celery.py new file mode 100644 index 0000000..62773db --- /dev/null +++ b/app/celery.py @@ -0,0 +1,7 @@ +from __future__ import absolute_import +from celery import Celery + +app = Celery('app', + broker='amqp://cl:cl1234@localhost/cl_vhost', + backend='rpc://', + include=['app.tasks']) diff --git a/app/ultrametrics.py b/app/ultrametrics.py new file mode 100644 index 0000000..97290bd --- /dev/null +++ b/app/ultrametrics.py @@ -0,0 +1,65 @@ +import asyncio, logging, time +from datetime import datetime +from threading import Thread +from typing import Optional, Tuple, Sequence + +import aiohttp + +logging.basicConfig(format='%(levelname)s:%(message)s', filename='example.log', filemode='w', level=logging.INFO) + +URLS = [ + "https://python.org", + "https://google.com", + "https://stackoverflow.com", + 'https://dotya.ml', + 'https://git.dotya.ml', + 'https://drone.dotya.ml', + 'https://netdata.dotya.ml', + 'https://tew0x00.dotya.ml', + 'https://speedtest.dotya.ml', + 'https://testdotya.dotya.ml', + "https://ubuntu.com", +] + +def start_background_loop(loop: asyncio.AbstractEventLoop) -> None: + asyncio.set_event_loop(loop) + loop.run_forever() + + +async def fetch(url: str, session: aiohttp.ClientSession = None) -> Tuple[str, str]: + async def _fetch(url: str, session: aiohttp.ClientSession): + s = time.time() + async with session.head(url) as response: + f = time.time() - s + m = ('[ %f ]\t<%s>\ttook: %f\t(%s)' %(time.time(),response.status,f,url)) + print(m) + logging.info(m) + return url + + + if session: + return await _fetch(url, session) + else: + async with aiohttp.ClientSession() as session: + return await _fetch(url, session) + +async def fetch_urls(loop: asyncio.AbstractEventLoop) -> Sequence[Tuple[str, str]]: + async with aiohttp.ClientSession() as session: + tasks = [loop.create_task(fetch(url, session)) for url in URLS] + results = await asyncio.gather(*tasks) + return results + +def main() -> None: + loop = asyncio.new_event_loop() + t = Thread(target=start_background_loop, args=(loop,), daemon=True) + t.start() + + + + while True: + task = asyncio.run_coroutine_threadsafe(fetch_urls(loop), loop) + time.sleep(10) + + +if __name__ == '__main__': + main() diff --git a/nurun.sh b/nurun.sh new file mode 100644 index 0000000..742155a --- /dev/null +++ b/nurun.sh @@ -0,0 +1,5 @@ +#!/bin/bash + +celery worker -A app.celery --loglevel=info & +sleep 5 +python -m app.run_tasks &