Mam hosting Flask bez dostępu do cron
poleceń.
Jak mogę co godzinę wykonywać jakąś funkcję Pythona?
Mam hosting Flask bez dostępu do cron
poleceń.
Jak mogę co godzinę wykonywać jakąś funkcję Pythona?
Odpowiedzi:
Można skorzystać BackgroundScheduler()
z APScheduler opakowania (v3.5.3):
import time
import atexit
from apscheduler.schedulers.background import BackgroundScheduler
def print_date_time():
print(time.strftime("%A, %d. %B %Y %I:%M:%S %p"))
scheduler = BackgroundScheduler()
scheduler.add_job(func=print_date_time, trigger="interval", seconds=3)
scheduler.start()
# Shut down the scheduler when exiting the app
atexit.register(lambda: scheduler.shutdown())
Zauważ, że dwa z tych programów planujących zostaną uruchomione, gdy Flask będzie w trybie debugowania. Aby uzyskać więcej informacji, sprawdź to pytanie.
flask
miał App.runonce
lub App.runForNseconds
mógłbyś przełączać się między schedule
a
Możesz wykorzystać APScheduler
w swojej aplikacji Flask i uruchamiać zadania za pośrednictwem jego interfejsu:
import atexit
# v2.x version - see https://stackoverflow.com/a/38501429/135978
# for the 3.x version
from apscheduler.scheduler import Scheduler
from flask import Flask
app = Flask(__name__)
cron = Scheduler(daemon=True)
# Explicitly kick off the background thread
cron.start()
@cron.interval_schedule(hours=1)
def job_function():
# Do your work here
# Shutdown your cron thread if the web process is stopped
atexit.register(lambda: cron.shutdown(wait=False))
if __name__ == '__main__':
app.run()
lambda
zasięgu atexit.register
?
atexit.register
potrzebuje funkcji do wywołania. Gdybyśmy właśnie zdali cron.shutdown(wait=False)
, przekazalibyśmy wynik wywołania cron.shutdown
(co jest prawdopodobnie None
). Zamiast tego przekazujemy funkcję o zerowym argumencie i zamiast nadawać jej nazwę i używać instrukcji, def shutdown(): cron.shutdown(wait=False)
a atexit.register(shutdown)
zamiast tego rejestrujemy ją w linii lambda:
(co jest wyrażeniem funkcji o zerowym argumencie ).
Jestem trochę nowy w koncepcji harmonogramów aplikacji, ale to, co znalazłem tutaj dla APScheduler v3.3.1 , jest trochę inne. Uważam, że w najnowszych wersjach zmieniła się struktura pakietów, nazwy klas itp., Dlatego zamieszczam tutaj świeże rozwiązanie, które wykonałem niedawno, zintegrowane z podstawową aplikacją Flaska:
#!/usr/bin/python3
""" Demonstrating Flask, using APScheduler. """
from apscheduler.schedulers.background import BackgroundScheduler
from flask import Flask
def sensor():
""" Function for test purposes. """
print("Scheduler is alive!")
sched = BackgroundScheduler(daemon=True)
sched.add_job(sensor,'interval',minutes=60)
sched.start()
app = Flask(__name__)
@app.route("/home")
def home():
""" Function for test purposes. """
return "Welcome Home :) !"
if __name__ == "__main__":
app.run()
Zostawiam tutaj również streszczenie , jeśli ktoś jest zainteresowany aktualizacjami dla tego przykładu.
Oto kilka odniesień do przyszłych lektur:
apscheduler.schedulers.background
, ponieważ może się zdarzyć, że napotkasz inne przydatne scenariusze dla Twojej aplikacji. Pozdrowienia.
Możesz spróbować użyć BackgroundScheduler APScheduler, aby zintegrować zadanie interwałowe z aplikacją Flask. Poniżej znajduje się przykład wykorzystujący blueprint i fabrykę aplikacji ( init .py):
from datetime import datetime
# import BackgroundScheduler
from apscheduler.schedulers.background import BackgroundScheduler
from flask import Flask
from webapp.models.main import db
from webapp.controllers.main import main_blueprint
# define the job
def hello_job():
print('Hello Job! The time is: %s' % datetime.now())
def create_app(object_name):
app = Flask(__name__)
app.config.from_object(object_name)
db.init_app(app)
app.register_blueprint(main_blueprint)
# init BackgroundScheduler job
scheduler = BackgroundScheduler()
# in your case you could change seconds to hours
scheduler.add_job(hello_job, trigger='interval', seconds=3)
scheduler.start()
try:
# To keep the main thread alive
return app
except:
# shutdown if app occurs except
scheduler.shutdown()
Mam nadzieję, że to pomoże :)
Odniesienie:
Aby uzyskać proste rozwiązanie, możesz dodać trasę, taką jak
@app.route("/cron/do_the_thing", methods=['POST'])
def do_the_thing():
logging.info("Did the thing")
return "OK", 200
Następnie dodaj zadanie unix cron, które okresowo POSTuje na tym punkcie końcowym. Na przykład, aby uruchamiać go raz na minutę, w typie terminala crontab -e
i dodać tę linię:
* * * * * /opt/local/bin/curl -X POST https://YOUR_APP/cron/do_the_thing
(Zwróć uwagę, że ścieżka do zwijania musi być kompletna, ponieważ po uruchomieniu zadania nie będzie mieć Twojej ŚCIEŻKI. Pełną ścieżkę do zwijania w systemie można znaleźć, klikając which curl
)
Podoba mi się to, że łatwo jest przetestować zadanie ręcznie, nie ma dodatkowych zależności, a ponieważ nie dzieje się nic specjalnego, łatwo jest to zrozumieć.
Jeśli chcesz zabezpieczyć hasłem swoje zadanie cron, możesz pip install Flask-BasicAuth
, a następnie dodać poświadczenia do konfiguracji aplikacji:
app = Flask(__name__)
app.config['BASIC_AUTH_REALM'] = 'realm'
app.config['BASIC_AUTH_USERNAME'] = 'falken'
app.config['BASIC_AUTH_PASSWORD'] = 'joshua'
Aby zabezpieczyć hasłem punkt końcowy zadania:
from flask_basicauth import BasicAuth
basic_auth = BasicAuth(app)
@app.route("/cron/do_the_thing", methods=['POST'])
@basic_auth.required
def do_the_thing():
logging.info("Did the thing a bit more securely")
return "OK", 200
Następnie, aby zadzwonić do niego z pracy crona:
* * * * * /opt/local/bin/curl -X POST https://falken:joshua@YOUR_APP/cron/do_the_thing
Inną alternatywą może być użycie Flask-APScheduler, który dobrze współgra z Flaskiem, np .:
Więcej informacji tutaj:
Kompletny przykład wykorzystujący harmonogram i przetwarzanie wieloprocesowe, ze sterowaniem włączaniem i wyłączaniem oraz parametrem funkcji run_job (), kody powrotne są uproszczone, a interwał jest ustawiony na 10 sekund, a następnie every(2).hour.do()
na 2 godziny. Harmonogram jest imponujący, nie dryfuje i nigdy nie widziałem, aby podczas planowania był oddalony o więcej niż 100 ms. Używanie wieloprocesorowości zamiast wątkowania, ponieważ ma metodę kończenia.
#!/usr/bin/env python3
import schedule
import time
import datetime
import uuid
from flask import Flask, request
from multiprocessing import Process
app = Flask(__name__)
t = None
job_timer = None
def run_job(id):
""" sample job with parameter """
global job_timer
print("timer job id={}".format(id))
print("timer: {:.4f}sec".format(time.time() - job_timer))
job_timer = time.time()
def run_schedule():
""" infinite loop for schedule """
global job_timer
job_timer = time.time()
while 1:
schedule.run_pending()
time.sleep(1)
@app.route('/timer/<string:status>')
def mytimer(status, nsec=10):
global t, job_timer
if status=='on' and not t:
schedule.every(nsec).seconds.do(run_job, str(uuid.uuid4()))
t = Process(target=run_schedule)
t.start()
return "timer on with interval:{}sec\n".format(nsec)
elif status=='off' and t:
if t:
t.terminate()
t = None
schedule.clear()
return "timer off\n"
return "timer status not changed\n"
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
Testujesz to, wydając po prostu:
$ curl http://127.0.0.1:5000/timer/on
timer on with interval:10sec
$ curl http://127.0.0.1:5000/timer/on
timer status not changed
$ curl http://127.0.0.1:5000/timer/off
timer off
$ curl http://127.0.0.1:5000/timer/off
timer status not changed
Co 10 sekund licznik czasu jest włączony, wysyła komunikat licznika do konsoli:
127.0.0.1 - - [18/Sep/2018 21:20:14] "GET /timer/on HTTP/1.1" 200 -
timer job id=b64ed165-911f-4b47-beed-0d023ead0a33
timer: 10.0117sec
timer job id=b64ed165-911f-4b47-beed-0d023ead0a33
timer: 10.0102sec
Możesz chcieć użyć jakiegoś mechanizmu kolejkowania z harmonogramem, takim jak planista RQ lub czymś cięższym, jak Seler (najprawdopodobniej przesada).