sounds good i ll share some info to set

sounds good i'll share some info to set that up when you'll work on it
2 Replies
DenDen 🏎
DenDen 🏎16mo ago
some info / steps helpful to setup an ingestion service 1. The libraries you can use for this purpose include requests for ingesting data from the REST API, pymongo to communicate with the MongoDB, and apscheduler for scheduling your data refreshing job. 2. Request Data from API:
import requests

def get_data_from_api(endpoint):
response = requests.get(endpoint)
return response.json() # convert response to JSON

import requests

def get_data_from_api(endpoint):
response = requests.get(endpoint)
return response.json() # convert response to JSON

3. Connect to MongoDB:
from pymongo import MongoClient

def connect_to_mongo(uri):
client = MongoClient(uri) # uri is your MongoDB Atlas URL
return client

from pymongo import MongoClient

def connect_to_mongo(uri):
client = MongoClient(uri) # uri is your MongoDB Atlas URL
return client

4. Store Data into MongoDB: Iterate through the JSON response, and insert each item into the database.
def store_data_to_mongo(db, data):
# Assuming 'data' is a list of dictionaries,
for item in data:
db.collection.insert_one(item) # replace 'collection' with your collection name

def store_data_to_mongo(db, data):
# Assuming 'data' is a list of dictionaries,
for item in data:
db.collection.insert_one(item) # replace 'collection' with your collection name

5. Set Up Scheduler: With APScheduler, you can set the function to run every 6 hours like this:
from apscheduler.schedulers.blocking importBlockingScheduler

def job():
client = connect_to_mongo(uri)
db = client['your_db'] # replace 'your_db' with your database name
data = get_data_from_api(api_endpoint)
store_data_to_mongo(db, data)

scheduler = BlockingScheduler()
scheduler.add_job(job, 'interval', hours=6)
scheduler.start()

from apscheduler.schedulers.blocking importBlockingScheduler

def job():
client = connect_to_mongo(uri)
db = client['your_db'] # replace 'your_db' with your database name
data = get_data_from_api(api_endpoint)
store_data_to_mongo(db, data)

scheduler = BlockingScheduler()
scheduler.add_job(job, 'interval', hours=6)
scheduler.start()

6. API Pagination: APIs often limit the amount of data returned for a single request. Therefore, it's necessary to handle API pagination. If the API uses page numbers, you can loop through pages like this:
def get_data_from_api(endpoint, page):
response = requests.get(f"{endpoint}?page={page}")
return response.json()

page = 1
while True:
data = get_data_from_api(endpoint, page)
if not data:
break
store_data_to_mongo(db, data)
page += 1

def get_data_from_api(endpoint, page):
response = requests.get(f"{endpoint}?page={page}")
return response.json()

page = 1
while True:
data = get_data_from_api(endpoint, page)
if not data:
break
store_data_to_mongo(db, data)
page += 1


7. Deploying the script: Finally, deploy this script on a server. It'll work there constantly, check the API, and update data in MongoDB every 6 hours
Unknown User
Unknown User16mo ago
Message Not Public
Sign In & Join Server To View
Want results from more Discord servers?
Add your server