Files
calorie_tracker/calorie_tracker_app/api_client.py
2026-01-30 15:03:43 +08:00

210 lines
7.1 KiB
Python

import requests
import json
from models import db, APICache, FoodItem
from datetime import datetime, timedelta
class NutritionAPI:
"""API client for nutrition data with caching"""
def __init__(self, api_key):
self.api_key = api_key
self.base_url = "https://api.api-ninjas.com/v1/nutrition"
self.headers = {'X-Api-Key': api_key}
self.cache_duration_days = 30
def search_food(self, query):
"""
Search for food nutrition data
Returns list of food items with nutrition info
"""
# Check cache first
cached = self._get_from_cache(query)
if cached:
return cached
# Make API request
try:
response = requests.get(
self.base_url,
headers=self.headers,
params={'query': query},
timeout=10
)
if response.status_code == 200:
data = response.json()
# Cache the response
self._save_to_cache(query, 'api_ninjas', data)
# Parse and return standardized format
return self._parse_api_response(data)
else:
print(f"API Error: {response.status_code}")
return []
except requests.exceptions.RequestException as e:
print(f"API Request failed: {e}")
return []
def _get_from_cache(self, query):
"""Check if query exists in cache and is not expired"""
cache_entry = APICache.query.filter_by(query=query.lower()).first()
if cache_entry:
# Check if cache is still valid (30 days)
age = datetime.utcnow() - cache_entry.cached_at
if age.days < self.cache_duration_days:
data = json.loads(cache_entry.response_json)
return self._parse_api_response(data)
return None
def _save_to_cache(self, query, source, data):
"""Save API response to cache"""
try:
cache_entry = APICache.query.filter_by(query=query.lower()).first()
if cache_entry:
# Update existing cache
cache_entry.response_json = json.dumps(data)
cache_entry.cached_at = datetime.utcnow()
else:
# Create new cache entry
cache_entry = APICache(
query=query.lower(),
api_source=source,
response_json=json.dumps(data),
cached_at=datetime.utcnow()
)
db.session.add(cache_entry)
db.session.commit()
except Exception as e:
print(f"Cache save failed: {e}")
db.session.rollback()
def _parse_api_response(self, data):
"""Parse API response into standardized format"""
foods = []
for item in data:
food = {
'name': item.get('name', '').title(),
'calories': item.get('calories', 0),
'protein_g': item.get('protein_g', 0),
'carbs_g': item.get('carbohydrates_total_g', 0),
'fat_g': item.get('fat_total_g', 0),
'fiber_g': item.get('fiber_g', 0),
'sugar_g': item.get('sugar_g', 0),
'sodium_mg': item.get('sodium_mg', 0),
'serving_size_g': item.get('serving_size_g', 100),
'source': 'api_ninjas'
}
foods.append(food)
return foods
def save_food_to_db(self, food_data):
"""Save a food item to the database"""
try:
# Check if food already exists
existing = FoodItem.query.filter_by(
name=food_data['name'],
source=food_data.get('source', 'api')
).first()
if existing:
return existing
# Create new food item
food = FoodItem(
name=food_data['name'],
calories=food_data['calories'],
protein_g=food_data.get('protein_g', 0),
carbs_g=food_data.get('carbs_g', 0),
fat_g=food_data.get('fat_g', 0),
fiber_g=food_data.get('fiber_g', 0),
sugar_g=food_data.get('sugar_g', 0),
sodium_mg=food_data.get('sodium_mg', 0),
serving_size_g=food_data.get('serving_size_g', 100),
serving_description=food_data.get('serving_description', '1 serving'),
source=food_data.get('source', 'api'),
api_data=json.dumps(food_data)
)
db.session.add(food)
db.session.commit()
return food
except Exception as e:
print(f"Error saving food to DB: {e}")
db.session.rollback()
return None
def search_all_sources(query, api_client):
"""
Search for food in all available sources
Priority: Local DB (Filipino) -> Local DB (cached) -> API
"""
results = []
# 1. Search Filipino foods first
filipino_foods = FoodItem.query.filter(
FoodItem.is_filipino == True,
db.or_(
FoodItem.name.ilike(f'%{query}%'),
FoodItem.name_tagalog.ilike(f'%{query}%')
)
).limit(5).all()
for food in filipino_foods:
results.append({
'id': food.id,
'name': food.name,
'name_tagalog': food.name_tagalog,
'calories': food.calories,
'protein_g': food.protein_g,
'carbs_g': food.carbs_g,
'fat_g': food.fat_g,
'serving_description': food.serving_description,
'source': 'filipino',
'category': food.category
})
# 2. Search other local foods
other_foods = FoodItem.query.filter(
FoodItem.is_filipino == False,
FoodItem.name.ilike(f'%{query}%')
).limit(5).all()
for food in other_foods:
results.append({
'id': food.id,
'name': food.name,
'calories': food.calories,
'protein_g': food.protein_g,
'carbs_g': food.carbs_g,
'fat_g': food.fat_g,
'serving_description': food.serving_description,
'source': food.source
})
# 3. If not enough results, search API
if len(results) < 3 and api_client.api_key:
api_results = api_client.search_food(query)
for food_data in api_results[:5]:
results.append({
'name': food_data['name'],
'calories': food_data['calories'],
'protein_g': food_data['protein_g'],
'carbs_g': food_data['carbs_g'],
'fat_g': food_data['fat_g'],
'serving_size_g': food_data['serving_size_g'],
'source': 'api',
'api_data': food_data
})
return results