Eldovyn
public networking is not available
from flask import Flask
from flask_mongoengine import MongoEngine
from routes.license import license_router
from routes.user import user_router
from celery import Celery, Task
from models import LicenseModel
from configs import REDIS_URL, MONGODB_URL, DB_NAME
def celery_init_app(app: Flask) -> Celery:
class FlaskTask(Task):
def __call__(self, *args: object, **kwargs: object) -> object:
with app.app_context():
return self.run(*args, **kwargs)
celery_app = Celery(app.name, task_cls=FlaskTask)
celery_app.config_from_object(app.config["CELERY"])
app.extensions["celery"] = celery_app
return celery_app
db = MongoEngine()
app = Flask(__name__)
app.config["MONGODB_SETTINGS"] = {
"db": DB_NAME,
"host": MONGODB_URL,
}
db.init_app(app)
app.config.from_mapping(
CELERY=dict(
broker_url=REDIS_URL,
result_backend=REDIS_URL,
task_ignore_result=True,
),
)
celery_app = celery_init_app(app)
@celery_app.task
def background_task(data):
import time
time.sleep(data["duration"])
if license := LicenseModel.objects(license=data["license"]).first():
license.delete()
return f'successfully deleted {data["license"]}'
@app.after_request
async def add_cors_headers(response):
response.headers["Access-Control-Allow-Origin"] = "*"
response.headers["Access-Control-Allow-Methods"] = "GET, POST, PUT, DELETE, OPTIONS"
response.headers["Access-Control-Allow-Headers"] = (
"Content-Type, X-API-KEY, X-LICENSE"
)
return response
@app.route("/")
async def home():
return "welcome to auto store api by nexblu store"
app.register_blueprint(license_router)
app.register_blueprint(user_router)
if __name__ == "__main__":
app.run(debug=True, port=5000)
from flask import Flask
from flask_mongoengine import MongoEngine
from routes.license import license_router
from routes.user import user_router
from celery import Celery, Task
from models import LicenseModel
from configs import REDIS_URL, MONGODB_URL, DB_NAME
def celery_init_app(app: Flask) -> Celery:
class FlaskTask(Task):
def __call__(self, *args: object, **kwargs: object) -> object:
with app.app_context():
return self.run(*args, **kwargs)
celery_app = Celery(app.name, task_cls=FlaskTask)
celery_app.config_from_object(app.config["CELERY"])
app.extensions["celery"] = celery_app
return celery_app
db = MongoEngine()
app = Flask(__name__)
app.config["MONGODB_SETTINGS"] = {
"db": DB_NAME,
"host": MONGODB_URL,
}
db.init_app(app)
app.config.from_mapping(
CELERY=dict(
broker_url=REDIS_URL,
result_backend=REDIS_URL,
task_ignore_result=True,
),
)
celery_app = celery_init_app(app)
@celery_app.task
def background_task(data):
import time
time.sleep(data["duration"])
if license := LicenseModel.objects(license=data["license"]).first():
license.delete()
return f'successfully deleted {data["license"]}'
@app.after_request
async def add_cors_headers(response):
response.headers["Access-Control-Allow-Origin"] = "*"
response.headers["Access-Control-Allow-Methods"] = "GET, POST, PUT, DELETE, OPTIONS"
response.headers["Access-Control-Allow-Headers"] = (
"Content-Type, X-API-KEY, X-LICENSE"
)
return response
@app.route("/")
async def home():
return "welcome to auto store api by nexblu store"
app.register_blueprint(license_router)
app.register_blueprint(user_router)
if __name__ == "__main__":
app.run(debug=True, port=5000)
14 replies
public networking is not available
{
"$schema": "https://railway.app/railway.schema.json",
"build": {
"builder": "NIXPACKS"
},
"deploy": {
"startCommand": "gunicorn main:app",
"restartPolicyType": "ON_FAILURE",
"restartPolicyMaxRetries": 10
}
}
{
"$schema": "https://railway.app/railway.schema.json",
"build": {
"builder": "NIXPACKS"
},
"deploy": {
"startCommand": "gunicorn main:app",
"restartPolicyType": "ON_FAILURE",
"restartPolicyMaxRetries": 10
}
}
14 replies