Add more security and audit
Some checks failed
Build, Push & Deploy / Build & Push image (push) Failing after 56s
Build, Push & Deploy / Deploy naar VPS (push) Has been skipped
Build & Push / Build & Push image (push) Successful in 1m2s

This commit is contained in:
2026-02-28 14:47:33 +01:00
parent db87ea447a
commit 07bcfede75
11 changed files with 386 additions and 76 deletions

View File

@@ -1,4 +1,7 @@
FROM python:3.12-slim
# Pin op een specifieke patch versie voor reproduceerbare builds.
# Controleer regelmatig op https://hub.docker.com/_/python voor updates.
# Bij elke Python security patch: versienummer hier bijwerken + opnieuw builden.
FROM python:3.12.9-slim
WORKDIR /app
@@ -8,9 +11,10 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
gcc \
&& rm -rf /var/lib/apt/lists/*
# Python dependencies
# Python dependencies — upgrade pip zelf ook voor security fixes
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
RUN pip install --no-cache-dir --upgrade pip && \
pip install --no-cache-dir -r requirements.txt
# App code + entrypoint (chmod als root, vóór USER switch)
COPY . .

View File

@@ -1,65 +1,168 @@
import os
from flask import Flask
import logging
from flask import Flask, jsonify, request
from flask_sqlalchemy import SQLAlchemy
from flask_login import LoginManager
from flask_migrate import Migrate
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
from flask_talisman import Talisman
from werkzeug.middleware.proxy_fix import ProxyFix
db = SQLAlchemy()
logger = logging.getLogger(__name__)
db = SQLAlchemy()
login_manager = LoginManager()
migrate = Migrate()
migrate = Migrate()
limiter = Limiter(key_func=get_remote_address, default_limits=[])
def create_app():
app = Flask(__name__, template_folder='templates', static_folder='static')
# Config
app.config['SECRET_KEY'] = os.environ['SECRET_KEY']
app.config['SQLALCHEMY_DATABASE_URI'] = os.environ['DATABASE_URL']
# ── Config ────────────────────────────────────────────────────────────────
app.config['SECRET_KEY'] = os.environ['SECRET_KEY']
app.config['SQLALCHEMY_DATABASE_URI'] = os.environ['DATABASE_URL']
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['BASE_URL'] = os.environ.get('BASE_URL', 'http://localhost')
app.config['ORG_NAME'] = os.environ.get('ORG_NAME', 'GO! Scholengroep')
app.config['BASE_URL'] = os.environ.get('BASE_URL', 'http://localhost')
app.config['ORG_NAME'] = os.environ.get('ORG_NAME', 'GO! Scholengroep')
# OAuth2 config (voor later)
app.config['MICROSOFT_CLIENT_ID'] = os.environ.get('MICROSOFT_CLIENT_ID')
app.config['MICROSOFT_CLIENT_SECRET'] = os.environ.get('MICROSOFT_CLIENT_SECRET')
app.config['MICROSOFT_TENANT_ID'] = os.environ.get('MICROSOFT_TENANT_ID', 'common')
app.config['GOOGLE_CLIENT_ID'] = os.environ.get('GOOGLE_CLIENT_ID')
app.config['GOOGLE_CLIENT_SECRET'] = os.environ.get('GOOGLE_CLIENT_SECRET')
# OAuth2
app.config['MICROSOFT_CLIENT_ID'] = os.environ.get('MICROSOFT_CLIENT_ID')
app.config['MICROSOFT_CLIENT_SECRET'] = os.environ.get('MICROSOFT_CLIENT_SECRET')
app.config['MICROSOFT_TENANT_ID'] = os.environ.get('MICROSOFT_TENANT_ID', 'common')
# ProxyFix: Flask zit achter nginx als reverse proxy.
# x_for=1, x_proto=1 zorgt dat Flask de echte client IP en https ziet.
# Session cookie beveiliging
is_https = app.config['BASE_URL'].startswith('https')
app.config['SESSION_COOKIE_SECURE'] = is_https
app.config['SESSION_COOKIE_HTTPONLY'] = True
app.config['SESSION_COOKIE_SAMESITE'] = 'Lax' # Lax ipv Strict: compatibel met OAuth redirect
app.config['REMEMBER_COOKIE_SECURE'] = is_https
app.config['REMEMBER_COOKIE_HTTPONLY'] = True
app.config['REMEMBER_COOKIE_SAMESITE'] = 'Lax'
app.config['REMEMBER_COOKIE_DURATION'] = 86400 * 8 # 8 dagen max (was: onbeperkt)
# ── Rate limit handler ───────────────────────────────────────────────────
def _rate_limit_handler(e):
logger.warning(
f"Rate limit overschreden: {request.method} {request.path} "
f"van {request.remote_addr}"
)
return jsonify({
'error': 'Te veel verzoeken. Probeer later opnieuw.',
'retry_after': e.retry_after,
}), 429
# ── ProxyFix (Flask zit achter nginx) ────────────────────────────────────
app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1)
# Extensions
# ── Rate limiter ──────────────────────────────────────────────────────────
redis_url = os.environ.get('REDIS_URL', '')
if not redis_url:
logger.warning(
"REDIS_URL niet ingesteld — rate limiter gebruikt in-memory storage. "
"Dit werkt NIET correct met meerdere gunicorn workers! "
"Stel REDIS_URL in voor productie."
)
redis_url = 'memory://'
limiter.init_app(
app,
storage_uri=redis_url,
strategy='fixed-window-elastic-expiry', # robuuster dan fixed-window
on_breach=_rate_limit_handler,
)
# ── Security headers via Talisman ─────────────────────────────────────────
# CSP: strikte whitelist — geen inline scripts, geen externe resources buiten cdnjs
# CSP: nonce-based voor scripts (Talisman injecteert {{ csp_nonce() }} in templates)
# unsafe-inline is uitgeschakeld voor scripts — gebruik {{ csp_nonce() }} in <script> tags
csp = {
'default-src': ["'self'"],
'script-src': ["'self'", 'cdnjs.cloudflare.com'], # nonce wordt auto toegevoegd
'style-src': ["'self'", "'unsafe-inline'"], # inline styles in templates (aanvaardbaar)
'img-src': ["'self'", 'data:'],
'font-src': ["'self'"],
'connect-src': ["'self'"],
'form-action': ["'self'"], # voorkomt form hijacking
'base-uri': ["'self'"], # voorkomt base tag injection
'frame-ancestors': ["'none'"], # clickjacking preventie
'object-src': ["'none'"], # geen Flash/plugins
}
Talisman(
app,
force_https=is_https,
strict_transport_security=is_https,
strict_transport_security_max_age=31536000, # 1 jaar HSTS
strict_transport_security_include_subdomains=True,
strict_transport_security_preload=False, # alleen aanvragen als je zeker bent
content_security_policy=csp,
content_security_policy_nonce_in=['script-src'], # nonce auto toegevoegd aan script tags
x_content_type_options=True,
x_frame_options='DENY',
referrer_policy='strict-origin-when-cross-origin',
feature_policy={ # moderne vervanger van Permissions-Policy
'geolocation': ''none'',
'microphone': ''none'',
'camera': ''none'',
}
)
# ── Extensions ────────────────────────────────────────────────────────────
db.init_app(app)
migrate.init_app(app, db)
login_manager.init_app(app)
login_manager.login_view = 'auth.login'
login_manager.login_view = 'auth.login'
login_manager.login_message = 'Gelieve in te loggen.'
# Import models (zodat Flask-Migrate ze kent)
from models import User, School, SchoolYear, Class, TeacherClass, Assessment
# Import models zodat Flask-Migrate ze kent
from models import User, School, SchoolYear, Class, TeacherClass, Assessment, AuditLog
@login_manager.user_loader
def load_user(user_id):
return User.query.get(int(user_id))
# Blueprints registreren
from routes.auth import auth_bp
from routes.api import api_bp
@login_manager.unauthorized_handler
def unauthorized():
if request.is_json or request.path.startswith('/api/') or request.path.startswith('/admin/'):
return jsonify({'error': 'Niet ingelogd'}), 401
from flask import redirect, url_for
return redirect(url_for('auth.login'))
# ── Globale error handlers ───────────────────────────────────────────────
from flask_limiter.errors import RateLimitExceeded
@app.errorhandler(RateLimitExceeded)
def handle_rate_limit(e):
return _rate_limit_handler(e)
@app.errorhandler(404)
def not_found(e):
if request.is_json or request.path.startswith(('/api/', '/admin/')):
return jsonify({'error': 'Niet gevonden'}), 404
from flask import redirect, url_for
return redirect(url_for('auth.login'))
@app.errorhandler(500)
def server_error(e):
logger.error(f"500 fout: {e}", exc_info=True)
if request.is_json or request.path.startswith(('/api/', '/admin/')):
return jsonify({'error': 'Serverfout — probeer later opnieuw'}), 500
return redirect(url_for('auth.login'))
# ── Blueprints ────────────────────────────────────────────────────────────
from routes.auth import auth_bp
from routes.api import api_bp
from routes.admin import admin_bp
from routes.pages import pages_bp
app.register_blueprint(auth_bp, url_prefix='/auth')
app.register_blueprint(api_bp, url_prefix='/api')
app.register_blueprint(auth_bp, url_prefix='/auth')
app.register_blueprint(api_bp, url_prefix='/api')
app.register_blueprint(admin_bp, url_prefix='/admin')
app.register_blueprint(pages_bp)
# ── Auditlog cleanup (1 jaar bewaren) ─────────────────────────────────────
# ── CLI commando's ────────────────────────────────────────────────────────
@app.cli.command('cleanup-audit')
def cleanup_audit():
"""Verwijder auditlog entries ouder dan 1 jaar. Voer uit via cron of handmatig."""
"""Verwijder auditlog entries ouder dan 1 jaar."""
from models import AuditLog
from datetime import datetime, timedelta
cutoff = datetime.utcnow() - timedelta(days=365)
@@ -73,4 +176,5 @@ def create_app():
app = create_app()
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0', port=5000)
# Nooit debug=True in productie — gebruik gunicorn via entrypoint.sh
app.run(debug=False, host='127.0.0.1', port=5000)

View File

@@ -106,7 +106,8 @@ class User(UserMixin, db.Model):
def is_teacher(self): return self.role == 'teacher'
def set_password(self, password):
self.password_hash = generate_password_hash(password)
# scrypt is het sterkste algoritme in werkzeug — veel meer weerstand tegen brute force
self.password_hash = generate_password_hash(password, method='scrypt')
def check_password(self, password):
if not self.password_hash:

View File

@@ -1,12 +1,14 @@
flask==3.0.3
flask==3.1.1
flask-login==0.6.3
flask-sqlalchemy==3.1.1
flask-migrate==4.0.7
flask-cors==4.0.1
psycopg2-binary==2.9.9
gunicorn==22.0.0
python-dotenv==1.0.1
werkzeug==3.0.3
sqlalchemy==2.0.31
authlib==1.3.1 # OAuth2 (Microsoft + Google, later)
flask-migrate==4.1.0
flask-limiter==3.9.0 # rate limiting
flask-talisman==1.1.0 # security headers (CSP, HSTS, X-Frame-Options, ...)
psycopg2-binary==2.9.10
gunicorn==23.0.0
python-dotenv==1.1.0
werkzeug==3.1.3
sqlalchemy==2.0.41
authlib==1.4.1
requests==2.32.3
redis==5.2.1 # backend voor flask-limiter in productie

View File

@@ -303,8 +303,7 @@ def add_scholengroep_ict():
db.session.add(user)
db.session.flush()
audit_log('user.create', 'user', target_type='user', target_id=str(user.id),
detail={'email': email, 'role': user.role, 'school_id': school_id},
school_id=school_id)
detail={'email': email, 'role': user.role})
db.session.commit()
return jsonify({'user': user.to_dict()}), 201

View File

@@ -2,10 +2,10 @@ from datetime import datetime
from flask import Blueprint, jsonify, request
from flask_login import login_required, current_user
from models import User, SchoolYear, Assessment, School, Class, AuditLog
from app import db
from services.doelen import load_index, load_vak, is_valid_vak_id
from services.audit import audit_log
from functools import wraps
from app import db, limiter
api_bp = Blueprint('api', __name__)
@@ -71,6 +71,7 @@ def get_assessments():
@api_bp.route('/assessments', methods=['POST'])
@login_required
@limiter.limit('120 per minute') # max 2 per seconde per gebruiker
def save_assessment():
data = request.get_json() or {}
vak_id = (data.get('vak_id') or '').strip()
@@ -81,6 +82,9 @@ def save_assessment():
return jsonify({'error': 'vak_id en goal_id zijn verplicht'}), 400
if status not in ('groen', 'oranje', 'roze', ''):
return jsonify({'error': 'Ongeldige status — gebruik groen, oranje, roze of leeg'}), 400
# Sanitiseer input — voorkomt oversized data in DB
if len(vak_id) > 100 or len(goal_id) > 50:
return jsonify({'error': 'Ongeldige invoer'}), 400
if not current_user.school_id:
return jsonify({'error': 'Account is nog niet gekoppeld aan een school'}), 400
@@ -283,8 +287,8 @@ def get_audit_log():
if not current_user.is_school_ict:
return jsonify({'error': 'Geen toegang'}), 403
page = int(request.args.get('page', 1))
per_page = int(request.args.get('per_page', 50))
page = max(1, int(request.args.get('page', 1)))
per_page = min(100, max(1, int(request.args.get('per_page', 50)))) # max 100 per pagina
category = request.args.get('category')
search = request.args.get('search', '').strip()

View File

@@ -15,19 +15,33 @@ import os
import secrets
import logging
from datetime import datetime
from urllib.parse import urlencode
from urllib.parse import urlencode, urlparse
import requests
from services.audit import audit_log
from app import db, limiter
from flask import (Blueprint, render_template, request, redirect,
url_for, flash, jsonify, session, current_app)
from flask_login import login_user, logout_user, login_required, current_user
from models import User, School
from app import db
logger = logging.getLogger(__name__)
auth_bp = Blueprint('auth', __name__)
def _safe_next_url(next_url: str | None) -> str:
"""Valideer dat de next-redirect intern blijft — voorkomt open redirect aanvallen."""
if not next_url:
return url_for('pages.dashboard')
parsed = urlparse(next_url)
# Weiger externe URLs (heeft netloc) of protocol-relative URLs
if parsed.netloc or parsed.scheme:
return url_for('pages.dashboard')
# Zorg dat het pad begint met /
if not next_url.startswith('/'):
return url_for('pages.dashboard')
return next_url
ENTRA_AUTHORITY = "https://login.microsoftonline.com/common"
ENTRA_AUTH_URL = f"{ENTRA_AUTHORITY}/oauth2/v2.0/authorize"
ENTRA_TOKEN_URL = f"{ENTRA_AUTHORITY}/oauth2/v2.0/token"
@@ -117,6 +131,7 @@ def logout():
@auth_bp.route('/microsoft')
@limiter.limit('20 per minute')
def microsoft_login():
if not _entra_client_id():
flash('Microsoft login is niet geconfigureerd.', 'error')
@@ -138,6 +153,7 @@ def microsoft_login():
@auth_bp.route('/callback')
@limiter.limit('20 per minute')
def microsoft_callback():
error = request.args.get('error')
if error:
@@ -220,10 +236,11 @@ def microsoft_callback():
db.session.commit()
logger.info(f"Entra login: {email} (nieuw: {is_new}, school_id: {user.school_id})")
return redirect(request.args.get('next') or url_for('pages.dashboard'))
return redirect(_safe_next_url(request.args.get('next')))
@auth_bp.route('/setup', methods=['GET', 'POST'])
@limiter.limit('5 per minute')
def setup():
admin = User.query.filter_by(role='superadmin').first()
if admin and admin.password_hash:
@@ -252,7 +269,7 @@ def setup():
first_name='Super', last_name='Admin')
db.session.add(admin)
admin.set_password(password)
admin.set_password(password) # pbkdf2:sha256 — zie models.py voor hash methode
db.session.commit()
if request.is_json:
@@ -264,6 +281,7 @@ def setup():
@auth_bp.route('/superadmin-login', methods=['POST'])
@limiter.limit('10 per minute; 30 per hour')
def superadmin_login():
"""Fallback login ENKEL voor de superadmin — niet voor gewone gebruikers."""
if current_user.is_authenticated: