first commit

This commit is contained in:
2026-02-28 00:02:02 +01:00
commit 6295c58d33
36 changed files with 7017 additions and 0 deletions

View File

544
backend/routes/admin.py Normal file
View File

@@ -0,0 +1,544 @@
"""
Admin routes
Toegang per rol:
superadmin → alles
scholengroep_ict → scholen + gebruikers beheren, doelen uploaden
school_ict → leerkrachten en klassen van eigen school beheren
"""
import re
import json as jsonlib
from flask import Blueprint, jsonify, request
from services.audit import audit_log
from flask_login import login_required, current_user
from models import User, School, SchoolYear, Class, TeacherClass
from app import db
from functools import wraps
admin_bp = Blueprint('admin', __name__)
VALID_ROLES = ('superadmin', 'scholengroep_ict', 'school_ict', 'director', 'teacher')
# ── Toegangsdecorators ────────────────────────────────────────────────────────
def superadmin_required(f):
@wraps(f)
def decorated(*args, **kwargs):
if not current_user.is_superadmin:
return jsonify({'error': 'Geen toegang — superadmin vereist'}), 403
return f(*args, **kwargs)
return decorated
def scholengroep_ict_required(f):
@wraps(f)
def decorated(*args, **kwargs):
if not current_user.is_scholengroep_ict:
return jsonify({'error': 'Geen toegang — scholengroep ICT vereist'}), 403
return f(*args, **kwargs)
return decorated
def school_ict_required(f):
@wraps(f)
def decorated(*args, **kwargs):
if not current_user.is_school_ict:
return jsonify({'error': 'Geen toegang — school ICT vereist'}), 403
return f(*args, **kwargs)
return decorated
# ── Scholen (scholengroep_ict) ────────────────────────────────────────────────
@admin_bp.route('/schools', methods=['GET'])
@login_required
@scholengroep_ict_required
def list_schools():
schools = School.query.order_by(School.name).all()
result = []
for s in schools:
d = s.to_dict()
d['user_count'] = User.query.filter_by(school_id=s.id, is_active=True).count()
result.append(d)
return jsonify({'schools': result})
@admin_bp.route('/schools', methods=['POST'])
@login_required
@scholengroep_ict_required
def create_school():
data = request.get_json() or {}
name = data.get('name', '').strip()
slug = data.get('slug', '').strip().lower()
domains = [d.strip().lower() for d in data.get('email_domains', []) if d.strip()]
if not name:
return jsonify({'error': 'Naam is verplicht'}), 400
if not slug:
slug = re.sub(r'[^a-z0-9]+', '-', name.lower()).strip('-')
if School.query.filter_by(slug=slug).first():
return jsonify({'error': f'Slug "{slug}" is al in gebruik'}), 409
school = School(name=name, slug=slug, email_domains=domains)
db.session.add(school)
db.session.flush()
audit_log('school.create', 'school', target_type='school', target_id=str(school.id),
detail={'name': name, 'slug': slug}, school_id=school.id)
db.session.commit()
return jsonify({'school': school.to_dict()}), 201
@admin_bp.route('/schools/<int:school_id>', methods=['PUT'])
@login_required
@scholengroep_ict_required
def update_school(school_id):
school = School.query.get_or_404(school_id)
data = request.get_json() or {}
if 'name' in data:
school.name = data['name'].strip()
if 'email_domains' in data:
school.email_domains = [d.strip().lower() for d in data['email_domains'] if d.strip()]
db.session.commit()
return jsonify({'school': school.to_dict()})
@admin_bp.route('/schools/<int:school_id>', methods=['DELETE'])
@login_required
@scholengroep_ict_required
def delete_school(school_id):
school = School.query.get_or_404(school_id)
school_name = school.name
try:
audit_log('school.delete', 'school', target_type='school', target_id=str(school_id),
detail={'name': school_name}, school_id=school_id)
db.session.flush()
db.session.execute(
db.text("DELETE FROM schools WHERE id = :id"),
{"id": school_id}
)
db.session.commit()
except Exception as e:
db.session.rollback()
return jsonify({'error': f'Verwijderen mislukt: {str(e)}'}), 500
return jsonify({'deleted': True})
# ── Schooljaren (globaal — niet per school) ──────────────────────────────────
@admin_bp.route('/years', methods=['GET'])
@login_required
@school_ict_required
def list_years():
"""Alle globale schooljaren, nieuwste eerst."""
years = SchoolYear.query.filter_by(school_id=None) .order_by(SchoolYear.label.desc()).all()
return jsonify({'years': [y.to_dict() for y in years]})
@admin_bp.route('/years', methods=['POST'])
@login_required
@scholengroep_ict_required
def create_year():
"""Maak een nieuw globaal schooljaar aan."""
data = request.get_json() or {}
label = data.get('label', '').strip()
if not label:
return jsonify({'error': 'Label is verplicht (bv. 2025-2026)'}), 400
if SchoolYear.query.filter_by(label=label).first():
return jsonify({'error': f'Schooljaar {label} bestaat al'}), 409
if data.get('set_active', True):
SchoolYear.query.filter_by(school_id=None, is_active=True) .update({'is_active': False})
year = SchoolYear(school_id=None, label=label,
is_active=data.get('set_active', True))
db.session.add(year)
db.session.flush()
audit_log('year.create', 'system', target_type='school_year', target_id=str(year.id),
detail={'label': label, 'active': year.is_active})
db.session.commit()
return jsonify({'year': year.to_dict()}), 201
@admin_bp.route('/years/<int:year_id>/activate', methods=['PUT'])
@login_required
@scholengroep_ict_required
def activate_year(year_id):
"""Zet een schooljaar als actief (deactiveert de rest)."""
year = SchoolYear.query.filter_by(id=year_id, school_id=None).first_or_404()
SchoolYear.query.filter_by(school_id=None, is_active=True) .update({'is_active': False})
year.is_active = True
audit_log('year.activate', 'system', target_type='school_year', target_id=str(year_id),
detail={'label': year.label})
db.session.commit()
return jsonify({'year': year.to_dict()})
# ── Gebruikers per school ─────────────────────────────────────────────────────
@admin_bp.route('/schools/<int:school_id>/users', methods=['GET'])
@login_required
@school_ict_required
def list_school_users(school_id):
if not current_user.is_scholengroep_ict and current_user.school_id != school_id:
return jsonify({'error': 'Geen toegang tot deze school'}), 403
users = User.query.filter_by(school_id=school_id, is_active=True)\
.order_by(User.last_name, User.first_name).all()
return jsonify({'users': [u.to_dict() for u in users]})
@admin_bp.route('/schools/<int:school_id>/users', methods=['POST'])
@login_required
@school_ict_required
def add_user_to_school(school_id):
if not current_user.is_scholengroep_ict and current_user.school_id != school_id:
return jsonify({'error': 'Geen toegang tot deze school'}), 403
School.query.get_or_404(school_id)
data = request.get_json() or {}
email = data.get('email', '').strip().lower()
role = data.get('role', 'teacher')
if not email:
return jsonify({'error': 'E-mailadres is verplicht'}), 400
allowed_roles = ('teacher', 'director', 'school_ict')
if current_user.role == 'school_ict' and role not in allowed_roles:
return jsonify({'error': f'Rol "{role}" mag niet worden toegewezen door school ICT'}), 403
if role not in VALID_ROLES:
return jsonify({'error': f'Ongeldige rol: {role}'}), 400
existing = User.query.filter_by(email=email).first()
if existing:
# Account bestaat al (ook als uitgeschakeld) — activeer en update rol/school
existing.school_id = school_id
existing.role = role
existing.is_active = True
db.session.commit()
return jsonify({'user': existing.to_dict(), 'linked': True})
user = User(
email=email,
first_name=data.get('first_name', '').strip(),
last_name=data.get('last_name', '').strip(),
role=role,
school_id=school_id,
is_active=True,
)
db.session.add(user)
db.session.commit()
return jsonify({'user': user.to_dict(), 'linked': False}), 201
@admin_bp.route('/schools/<int:school_id>/users/<int:user_id>/role', methods=['PUT'])
@login_required
@school_ict_required
def update_user_role(school_id, user_id):
if not current_user.is_scholengroep_ict and current_user.school_id != school_id:
return jsonify({'error': 'Geen toegang tot deze school'}), 403
user = User.query.filter_by(id=user_id, school_id=school_id).first_or_404()
data = request.get_json() or {}
role = data.get('role', '')
allowed = ('teacher', 'director', 'school_ict')
if current_user.role == 'school_ict' and role not in allowed:
return jsonify({'error': f'Rol "{role}" mag niet worden toegewezen'}), 403
if role not in VALID_ROLES:
return jsonify({'error': f'Ongeldige rol: {role}'}), 400
user.role = role
db.session.commit()
return jsonify({'user': user.to_dict()})
@admin_bp.route('/schools/<int:school_id>/users/<int:user_id>', methods=['DELETE'])
@login_required
@school_ict_required
def remove_user_from_school(school_id, user_id):
if not current_user.is_scholengroep_ict and current_user.school_id != school_id:
return jsonify({'error': 'Geen toegang tot deze school'}), 403
user = User.query.filter_by(id=user_id, school_id=school_id).first_or_404()
user.is_active = False
audit_log('user.deactivate', 'user', target_type='user', target_id=str(user_id),
detail={'email': user.email, 'role': user.role},
school_id=current_user.school_id)
db.session.commit()
return jsonify({'deleted': True})
# ── Scholengroep ICT beheer (superadmin) ──────────────────────────────────────
@admin_bp.route('/scholengroep-ict', methods=['GET'])
@login_required
@superadmin_required
def list_scholengroep_ict():
users = User.query.filter_by(role='scholengroep_ict', is_active=True)\
.order_by(User.last_name).all()
return jsonify({'users': [u.to_dict() for u in users]})
@admin_bp.route('/scholengroep-ict', methods=['POST'])
@login_required
@superadmin_required
def add_scholengroep_ict():
data = request.get_json() or {}
email = data.get('email', '').strip().lower()
if not email:
return jsonify({'error': 'E-mailadres is verplicht'}), 400
user = User.query.filter_by(email=email).first()
if user:
user.role = 'scholengroep_ict'
user.school_id = None
user.is_active = True
db.session.commit()
return jsonify({'user': user.to_dict()})
user = User(
email=email,
first_name=data.get('first_name', '').strip(),
last_name=data.get('last_name', '').strip(),
role='scholengroep_ict',
is_active=True,
)
db.session.add(user)
db.session.flush()
audit_log('user.create', 'user', target_type='user', target_id=str(user.id),
detail={'email': email, 'role': user.role, 'school_id': school_id},
school_id=school_id)
db.session.commit()
return jsonify({'user': user.to_dict()}), 201
@admin_bp.route('/scholengroep-ict/<int:user_id>', methods=['DELETE'])
@login_required
@superadmin_required
def remove_scholengroep_ict(user_id):
user = User.query.get_or_404(user_id)
if user.role != 'scholengroep_ict':
return jsonify({'error': 'Gebruiker is geen scholengroep ICT'}), 400
user.is_active = False
db.session.commit()
return jsonify({'ok': True})
# ── Doelen upload (scholengroep_ict) ──────────────────────────────────────────
@admin_bp.route('/doelen', methods=['GET'])
@login_required
@scholengroep_ict_required
def list_doelen():
from services.doelen import load_index, list_installed_vakken
index = load_index()
installed = set(list_installed_vakken())
return jsonify({
'vakken': index.get('vakken', []),
'versie': index.get('versie'),
'installed': list(installed),
})
@admin_bp.route('/doelen/upload', methods=['POST'])
@login_required
@scholengroep_ict_required
def upload_doelen():
"""
Upload één of meerdere vak JSON bestanden via multipart/form-data (veld: 'files').
Bij maandelijkse update gewoon opnieuw uploaden — overschrijft bestaande bestanden.
"""
from services.doelen import validate_vak_json, save_vak, is_valid_vak_id
if 'files' not in request.files:
return jsonify({'error': 'Geen bestanden ontvangen (verwacht veld "files")'}), 400
results = []
for file in request.files.getlist('files'):
if not file.filename:
continue
result = {'filename': file.filename, 'ok': False}
if not file.filename.lower().endswith('.json'):
result['error'] = 'Alleen .json bestanden zijn toegestaan'
results.append(result)
continue
try:
data = jsonlib.loads(file.read().decode('utf-8'))
except Exception:
result['error'] = 'Ongeldig JSON — kon bestand niet lezen'
results.append(result)
continue
# Vak ID: uit het bestand zelf, anders van de bestandsnaam
vak_id = (data.get('vak') or file.filename[:-5]).lower().strip()
if not is_valid_vak_id(vak_id):
result['error'] = f'Ongeldig vak ID: "{vak_id}"'
results.append(result)
continue
errors = validate_vak_json(data)
if errors:
result['error'] = '; '.join(errors)
results.append(result)
continue
doelzinnen = [r for r in data['rijen'] if r.get('type') == 'doelzin']
save_vak(vak_id, data)
result.update({
'ok': True,
'vak_id': vak_id,
'vak_naam': data.get('vakNaam') or vak_id,
'aantalDoelzinnen': len(doelzinnen),
'versie': data.get('versie', '?'),
})
results.append(result)
ok_count = sum(1 for r in results if r['ok'])
err_count = len(results) - ok_count
return jsonify({
'ok': ok_count,
'errors': err_count,
'results': results,
}), (200 if ok_count > 0 else 400)
@admin_bp.route('/doelen/<vak_id>', methods=['DELETE'])
@login_required
@scholengroep_ict_required
def delete_doelen(vak_id):
from services.doelen import delete_vak, is_valid_vak_id
if not is_valid_vak_id(vak_id):
return jsonify({'error': 'Ongeldig vak ID'}), 400
if not delete_vak(vak_id):
return jsonify({'error': 'Bestand niet gevonden'}), 404
return jsonify({'deleted': True, 'vak_id': vak_id})
# ── Globale statistieken (superadmin) ─────────────────────────────────────────
@admin_bp.route('/stats')
@login_required
@scholengroep_ict_required
def global_stats():
return jsonify({
'schools': School.query.count(),
'users': User.query.filter_by(is_active=True).count(),
'teachers': User.query.filter_by(role='teacher', is_active=True).count(),
'directors': User.query.filter_by(role='director', is_active=True).count(),
'school_ict': User.query.filter_by(role='school_ict', is_active=True).count(),
'scholengroep_ict': User.query.filter_by(role='scholengroep_ict',is_active=True).count(),
})
# ── Klassen ───────────────────────────────────────────────────────────────────
@admin_bp.route('/schools/<int:school_id>/classes', methods=['GET'])
@login_required
@school_ict_required
def list_classes(school_id):
if not current_user.is_scholengroep_ict and current_user.school_id != school_id:
return jsonify({'error': 'Geen toegang'}), 403
classes = Class.query.filter_by(school_id=school_id) .order_by(Class.name).all()
return jsonify({'classes': [c.to_dict() for c in classes]})
@admin_bp.route('/schools/<int:school_id>/classes', methods=['POST'])
@login_required
@school_ict_required
def create_class(school_id):
if not current_user.is_scholengroep_ict and current_user.school_id != school_id:
return jsonify({'error': 'Geen toegang'}), 403
data = request.get_json() or {}
name = data.get('name', '').strip()
if not name:
return jsonify({'error': 'Naam is verplicht'}), 400
if Class.query.filter_by(school_id=school_id, name=name).first():
return jsonify({'error': f'Klas "{name}" bestaat al'}), 409
klas = Class(school_id=school_id, name=name)
db.session.add(klas)
db.session.flush()
audit_log('class.create', 'class', target_type='class', target_id=str(klas.id),
detail={'name': name}, school_id=school_id)
db.session.commit()
return jsonify({'class': klas.to_dict()}), 201
@admin_bp.route('/schools/<int:school_id>/classes/<int:class_id>', methods=['DELETE'])
@login_required
@school_ict_required
def delete_class(school_id, class_id):
if not current_user.is_scholengroep_ict and current_user.school_id != school_id:
return jsonify({'error': 'Geen toegang'}), 403
klas = Class.query.filter_by(id=class_id, school_id=school_id).first_or_404()
audit_log('class.delete', 'class', target_type='class', target_id=str(class_id),
detail={'name': klas.name}, school_id=school_id)
db.session.delete(klas)
db.session.commit()
return jsonify({'deleted': True})
@admin_bp.route('/schools/<int:school_id>/classes/<int:class_id>/teachers', methods=['PUT'])
@login_required
@school_ict_required
def set_class_teachers(school_id, class_id):
"""Vervang alle leerkrachten van een klas in één keer."""
if not current_user.is_scholengroep_ict and current_user.school_id != school_id:
return jsonify({'error': 'Geen toegang'}), 403
klas = Class.query.filter_by(id=class_id, school_id=school_id).first_or_404()
data = request.get_json() or {}
user_ids = data.get('user_ids', [])
teachers = User.query.filter(
User.id.in_(user_ids),
User.school_id == school_id,
User.is_active == True
).all()
klas.teachers = teachers
audit_log('class.teachers_updated', 'class', target_type='class', target_id=str(class_id),
detail={'name': klas.name, 'teacher_ids': user_ids}, school_id=school_id)
db.session.commit()
return jsonify({'class': klas.to_dict()})
@admin_bp.route('/users/<int:user_id>/classes', methods=['GET'])
@login_required
def get_user_classes(user_id):
"""Geeft klassen terug van een specifieke leerkracht (voor leerkracht zelf of beheerder)."""
if current_user.id != user_id and not current_user.is_school_ict:
return jsonify({'error': 'Geen toegang'}), 403
user = User.query.get_or_404(user_id)
classes = Class.query.filter_by(school_id=user.school_id) .order_by(Class.name).all()
return jsonify({
'all_classes': [{'id': c.id, 'name': c.name} for c in classes],
'my_classes': [{'id': c.id, 'name': c.name} for c in user.classes],
})
@admin_bp.route('/users/<int:user_id>/classes', methods=['PUT'])
@login_required
def set_user_classes(user_id):
"""Leerkracht stelt eigen klassen in, of beheerder doet het."""
if current_user.id != user_id and not current_user.is_school_ict:
return jsonify({'error': 'Geen toegang'}), 403
user = User.query.get_or_404(user_id)
data = request.get_json() or {}
class_ids = data.get('class_ids', [])
classes = Class.query.filter(
Class.id.in_(class_ids),
Class.school_id == user.school_id
).all()
user.classes = classes
audit_log('class.user_assignment', 'class', target_type='user', target_id=str(user_id),
detail={'class_ids': class_ids, 'class_names': [c.name for c in classes]},
school_id=user.school_id)
db.session.commit()
return jsonify({'classes': [{'id': c.id, 'name': c.name} for c in user.classes]})

316
backend/routes/api.py Normal file
View File

@@ -0,0 +1,316 @@
from datetime import datetime
from flask import Blueprint, jsonify, request
from flask_login import login_required, current_user
from models import User, SchoolYear, Assessment, School, Class, AuditLog
from app import db
from services.doelen import load_index, load_vak, is_valid_vak_id
from services.audit import audit_log
from functools import wraps
api_bp = Blueprint('api', __name__)
def director_required(f):
@wraps(f)
def decorated(*args, **kwargs):
if not current_user.is_director:
return jsonify({'error': 'Geen toegang'}), 403
return f(*args, **kwargs)
return decorated
def get_active_year(school_id=None):
"""Geeft het globaal actief schooljaar terug (school_id wordt genegeerd)."""
return SchoolYear.query.filter_by(school_id=None, is_active=True).first()
# ── Doelen (statische JSON bestanden) ─────────────────────────────────────────
@api_bp.route('/doelen/index')
@login_required
def doelen_index():
data = load_index()
if not data['vakken']:
return jsonify({
'error': 'Geen doelen gevonden. Upload eerst de JSON bestanden via het beheerderspaneel.'
}), 404
return jsonify(data)
@api_bp.route('/doelen/<vak_id>')
@login_required
def doelen_vak(vak_id):
if not is_valid_vak_id(vak_id):
return jsonify({'error': 'Ongeldig vak ID'}), 400
data = load_vak(vak_id)
if not data:
return jsonify({'error': f'Vak "{vak_id}" niet gevonden'}), 404
return jsonify(data)
# ── Beoordelingen ─────────────────────────────────────────────────────────────
@api_bp.route('/assessments', methods=['GET'])
@login_required
def get_assessments():
if not current_user.school_id:
return jsonify({'assessments': []})
school_year = get_active_year(current_user.school_id)
if not school_year:
return jsonify({'assessments': []})
year_id = request.args.get('year_id', school_year.id)
vak_id = request.args.get('vak_id')
query = Assessment.query.filter_by(user_id=current_user.id, school_year_id=year_id)
if vak_id:
query = query.filter_by(vak_id=vak_id)
return jsonify({'assessments': [a.to_dict() for a in query.all()]})
@api_bp.route('/assessments', methods=['POST'])
@login_required
def save_assessment():
data = request.get_json() or {}
vak_id = (data.get('vak_id') or '').strip()
goal_id = (data.get('goal_id') or '').strip()
status = (data.get('status') or '').strip()
if not vak_id or not goal_id:
return jsonify({'error': 'vak_id en goal_id zijn verplicht'}), 400
if status not in ('groen', 'oranje', 'roze', ''):
return jsonify({'error': 'Ongeldige status — gebruik groen, oranje, roze of leeg'}), 400
if not current_user.school_id:
return jsonify({'error': 'Account is nog niet gekoppeld aan een school'}), 400
school_year = get_active_year(current_user.school_id)
if not school_year:
return jsonify({'error': 'Geen actief schooljaar gevonden'}), 400
assessment = Assessment.query.filter_by(
user_id=current_user.id,
school_year_id=school_year.id,
vak_id=vak_id,
goal_id=goal_id,
).first()
if status == '':
if assessment:
db.session.delete(assessment)
db.session.commit()
return jsonify({'deleted': True})
if assessment:
assessment.status = status
assessment.updated_at = datetime.utcnow()
else:
assessment = Assessment(
user_id=current_user.id,
school_id=current_user.school_id,
school_year_id=school_year.id,
vak_id=vak_id,
goal_id=goal_id,
status=status,
)
db.session.add(assessment)
db.session.commit()
# Auditlog enkel bij statuswijziging (niet bij elke klik)
audit_log('assessment.save', 'assessment',
target_type='goal', target_id=f'{vak_id}:{goal_id}',
detail={'status': status})
return jsonify({'assessment': assessment.to_dict()})
# ── Directeur schooloverzicht ──────────────────────────────────────────────────
@api_bp.route('/school/overview')
@login_required
@director_required
def school_overview():
if not current_user.school_id:
return jsonify({'error': 'Geen school gekoppeld'}), 400
school_year = get_active_year(current_user.school_id)
if not school_year:
return jsonify({'error': 'Geen actief schooljaar'}), 400
# year_id param: directeur/admin kan wisselen, leerkracht zit vast aan actief jaar
year_id_param = request.args.get('year_id')
if year_id_param and current_user.is_director:
year_id = int(year_id_param)
selected_year = SchoolYear.query.filter_by(
id=year_id, school_id=current_user.school_id
).first() or school_year
else:
selected_year = school_year
year_id = school_year.id
vak_id = request.args.get('vak_id')
teachers = User.query.filter_by(
school_id=current_user.school_id, role='teacher', is_active=True
).all()
query = Assessment.query.filter_by(
school_id=current_user.school_id, school_year_id=year_id
)
if vak_id:
query = query.filter_by(vak_id=vak_id)
by_teacher = {t.id: {} for t in teachers}
for a in query.all():
by_teacher.setdefault(a.user_id, {})
by_teacher[a.user_id].setdefault(a.vak_id, {})
by_teacher[a.user_id][a.vak_id][a.goal_id] = a.status
return jsonify({
'school_year': selected_year.to_dict(),
'teachers': [t.to_dict() for t in teachers],
'assessments_by_teacher': by_teacher,
})
# ── Gebruikersbeheer (school_ict / directeur) ──────────────────────────────────
@api_bp.route('/users', methods=['GET'])
@login_required
@director_required
def list_users():
users = User.query.filter_by(
school_id=current_user.school_id, is_active=True
).order_by(User.last_name, User.first_name).all()
return jsonify({'users': [u.to_dict() for u in users]})
@api_bp.route('/users', methods=['POST'])
@login_required
@director_required
def create_user():
data = request.get_json() or {}
email = data.get('email', '').strip().lower()
if not email:
return jsonify({'error': 'E-mailadres is verplicht'}), 400
if User.query.filter_by(email=email).first():
return jsonify({'error': 'E-mailadres is al in gebruik'}), 409
user = User(
email=email,
first_name=data.get('first_name', '').strip(),
last_name=data.get('last_name', '').strip(),
role='teacher',
school_id=current_user.school_id,
)
db.session.add(user)
db.session.commit()
return jsonify({'user': user.to_dict()}), 201
@api_bp.route('/users/<int:user_id>', methods=['DELETE'])
@login_required
@director_required
def delete_user(user_id):
user = User.query.filter_by(
id=user_id, school_id=current_user.school_id
).first_or_404()
user.is_active = False
db.session.commit()
return jsonify({'deleted': True})
# ── Schooljaren (directeur/admin leesbaar) ────────────────────────────────────
@api_bp.route('/school/years')
@login_required
@director_required
def get_school_years():
"""Geeft alle globale schooljaren terug (voor jaarselectie in directeur dashboard)."""
years = SchoolYear.query.filter_by(school_id=None) .order_by(SchoolYear.label.desc()).all()
return jsonify({'years': [y.to_dict() for y in years]})
# ── Huidig ingelogde gebruiker ────────────────────────────────────────────────
@api_bp.route('/me')
@login_required
def me():
school_year = get_active_year(current_user.school_id) if current_user.school_id else None
return jsonify({
'user': current_user.to_dict(),
'school_year': school_year.to_dict() if school_year else None,
})
# ── Klassen voor leerkracht (zelf instellen) ──────────────────────────────────
@api_bp.route('/my/classes', methods=['GET'])
@login_required
def my_classes():
"""Geeft alle beschikbare klassen en eigen klassen terug."""
if not current_user.school_id:
return jsonify({'all_classes': [], 'my_classes': []})
all_cls = Class.query.filter_by(school_id=current_user.school_id) .order_by(Class.name).all()
return jsonify({
'all_classes': [{'id': c.id, 'name': c.name} for c in all_cls],
'my_classes': [{'id': c.id, 'name': c.name} for c in current_user.classes],
})
@api_bp.route('/my/classes', methods=['PUT'])
@login_required
def set_my_classes():
"""Leerkracht stelt eigen klassen in."""
data = request.get_json() or {}
class_ids = data.get('class_ids', [])
classes = Class.query.filter(
Class.id.in_(class_ids),
Class.school_id == current_user.school_id
).all()
current_user.classes = classes
audit_log('class.user_assignment', 'class', target_type='user',
target_id=str(current_user.id),
detail={'class_ids': class_ids, 'class_names': [c.name for c in classes]})
db.session.commit()
return jsonify({'my_classes': [{'id': c.id, 'name': c.name} for c in current_user.classes]})
# ── Auditlog ──────────────────────────────────────────────────────────────────
@api_bp.route('/audit-log')
@login_required
def get_audit_log():
if not current_user.is_school_ict:
return jsonify({'error': 'Geen toegang'}), 403
page = int(request.args.get('page', 1))
per_page = int(request.args.get('per_page', 50))
category = request.args.get('category')
search = request.args.get('search', '').strip()
query = AuditLog.query
# School ICT ziet enkel eigen school
if not current_user.is_scholengroep_ict:
query = query.filter(AuditLog.school_id == current_user.school_id)
if category:
query = query.filter(AuditLog.category == category)
if search:
query = query.filter(
db.or_(
AuditLog.action.ilike(f'%{search}%'),
AuditLog.detail.ilike(f'%{search}%'),
)
)
total = query.count()
entries = query.order_by(AuditLog.timestamp.desc()) .offset((page - 1) * per_page).limit(per_page).all()
return jsonify({
'total': total,
'page': page,
'pages': (total + per_page - 1) // per_page,
'entries': [e.to_dict() for e in entries],
})

291
backend/routes/auth.py Normal file
View File

@@ -0,0 +1,291 @@
"""
Auth routes - Microsoft Entra ID (Azure AD) OAuth2 + superadmin fallback
Flow voor Entra login:
1. Gebruiker klikt "Login met Microsoft"
2. Redirect naar Microsoft /authorize (common endpoint, werkt voor alle tenants)
3. Microsoft redirect terug naar /auth/callback met een code
4. We wisselen de code in voor tokens
5. We lezen het id_token uit → email, naam, oid, tid
6. We zoeken of maken de gebruiker aan in onze DB
7. We koppelen aan de juiste school via e-maildomein of bestaand account
"""
import os
import secrets
import logging
from datetime import datetime
from urllib.parse import urlencode
import requests
from services.audit import audit_log
from flask import (Blueprint, render_template, request, redirect,
url_for, flash, jsonify, session, current_app)
from flask_login import login_user, logout_user, login_required, current_user
from models import User, School
from app import db
logger = logging.getLogger(__name__)
auth_bp = Blueprint('auth', __name__)
ENTRA_AUTHORITY = "https://login.microsoftonline.com/common"
ENTRA_AUTH_URL = f"{ENTRA_AUTHORITY}/oauth2/v2.0/authorize"
ENTRA_TOKEN_URL = f"{ENTRA_AUTHORITY}/oauth2/v2.0/token"
ENTRA_USERINFO_URL = "https://graph.microsoft.com/v1.0/me"
ENTRA_SCOPES = "openid profile email User.Read"
def _entra_client_id():
return current_app.config.get('MICROSOFT_CLIENT_ID')
def _entra_client_secret():
return current_app.config.get('MICROSOFT_CLIENT_SECRET')
def _callback_url():
base = current_app.config.get('BASE_URL', 'http://localhost').rstrip('/')
return f"{base}/auth/callback"
def _find_school_for_email(email: str):
domain = email.split('@')[-1].lower()
schools = School.query.all()
for school in schools:
if school.email_domains and domain in [d.lower() for d in school.email_domains]:
return school
return None
def _get_or_create_user(email, first_name, last_name, oid, tid):
user = User.query.filter_by(oauth_provider='microsoft', oauth_id=oid).first()
if user:
user.first_name = first_name or user.first_name
user.last_name = last_name or user.last_name
user.email = email
user.entra_tenant_id = tid
db.session.commit()
return user, False
user = User.query.filter_by(email=email).first()
if user:
user.oauth_provider = 'microsoft'
user.oauth_id = oid
user.entra_tenant_id = tid
user.first_name = first_name or user.first_name
user.last_name = last_name or user.last_name
db.session.commit()
return user, False
school = _find_school_for_email(email)
user = User(
email=email, first_name=first_name, last_name=last_name,
role='teacher', school_id=school.id if school else None,
oauth_provider='microsoft', oauth_id=oid,
entra_tenant_id=tid, is_active=True,
)
db.session.add(user)
db.session.commit()
return user, True
@auth_bp.route('/superadmin')
def superadmin_page():
"""Directe loginpagina voor de platformbeheerder."""
if current_user.is_authenticated:
return redirect(url_for('pages.dashboard'))
return render_template('superadmin_login.html')
@auth_bp.route('/login')
def login():
if current_user.is_authenticated:
return redirect(url_for('pages.dashboard'))
entra_configured = bool(_entra_client_id() and _entra_client_secret())
org_name = current_app.config.get('ORG_NAME', 'GO! Scholengroep')
return render_template('login.html', entra_configured=entra_configured, org_name=org_name)
@auth_bp.route('/logout')
@login_required
def logout():
audit_log('logout', 'auth')
logout_user()
if _entra_client_id():
post_logout = current_app.config.get('BASE_URL', 'http://localhost') + '/auth/login'
return redirect(
f"https://login.microsoftonline.com/common/oauth2/v2.0/logout"
f"?post_logout_redirect_uri={post_logout}"
)
return redirect(url_for('auth.login'))
@auth_bp.route('/microsoft')
def microsoft_login():
if not _entra_client_id():
flash('Microsoft login is niet geconfigureerd.', 'error')
return redirect(url_for('auth.login'))
state = secrets.token_urlsafe(32)
session['oauth_state'] = state
params = {
'client_id': _entra_client_id(),
'response_type': 'code',
'redirect_uri': _callback_url(),
'scope': ENTRA_SCOPES,
'state': state,
'response_mode': 'query',
'prompt': 'select_account',
}
return redirect(f"{ENTRA_AUTH_URL}?{urlencode(params)}")
@auth_bp.route('/callback')
def microsoft_callback():
error = request.args.get('error')
if error:
logger.warning(f"Entra fout: {error}{request.args.get('error_description')}")
flash('Inloggen via Microsoft mislukt. Probeer opnieuw.', 'error')
return redirect(url_for('auth.login'))
state = request.args.get('state', '')
expected_state = session.pop('oauth_state', None)
if not expected_state or state != expected_state:
logger.warning("OAuth2 state mismatch")
flash('Ongeldige sessie. Probeer opnieuw in te loggen.', 'error')
return redirect(url_for('auth.login'))
code = request.args.get('code')
if not code:
flash('Geen autorisatiecode ontvangen van Microsoft.', 'error')
return redirect(url_for('auth.login'))
try:
token_resp = requests.post(ENTRA_TOKEN_URL, data={
'client_id': _entra_client_id(),
'client_secret': _entra_client_secret(),
'code': code,
'redirect_uri': _callback_url(),
'grant_type': 'authorization_code',
'scope': ENTRA_SCOPES,
}, timeout=15)
token_resp.raise_for_status()
tokens = token_resp.json()
except requests.RequestException as e:
logger.error(f"Token uitwisseling mislukt: {e}")
flash('Kon niet communiceren met Microsoft. Probeer opnieuw.', 'error')
return redirect(url_for('auth.login'))
access_token = tokens.get('access_token')
if not access_token:
flash('Geen access token ontvangen van Microsoft.', 'error')
return redirect(url_for('auth.login'))
try:
graph_resp = requests.get(
ENTRA_USERINFO_URL,
headers={'Authorization': f'Bearer {access_token}'},
params={'$select': 'id,displayName,givenName,surname,mail,userPrincipalName'},
timeout=10
)
graph_resp.raise_for_status()
profile = graph_resp.json()
except requests.RequestException as e:
logger.error(f"Graph API mislukt: {e}")
flash('Kon gebruikersgegevens niet ophalen bij Microsoft.', 'error')
return redirect(url_for('auth.login'))
email = (profile.get('mail') or profile.get('userPrincipalName', '')).lower().strip()
first_name = profile.get('givenName') or ''
last_name = profile.get('surname') or ''
oid = profile.get('id', '')
tid = '' # tenant wordt opgeslagen via de oid
if not email or not oid:
flash('Onvoldoende profielgegevens ontvangen van Microsoft.', 'error')
return redirect(url_for('auth.login'))
user, is_new = _get_or_create_user(email, first_name, last_name, oid, tid)
if not user.is_active:
flash('Uw account is gedeactiveerd. Contacteer uw ICT-beheerder.', 'error')
return redirect(url_for('auth.login'))
if not user.school_id and not user.is_scholengroep_ict and not user.is_superadmin:
flash(
'Uw account is aangemaakt maar nog niet gekoppeld aan een school. '
'Contacteer uw ICT-beheerder.', 'warning'
)
login_user(user, remember=True)
user.last_login = datetime.utcnow()
audit_log('login.success', 'auth', detail={'provider': 'microsoft', 'new_user': is_new})
db.session.commit()
logger.info(f"Entra login: {email} (nieuw: {is_new}, school_id: {user.school_id})")
return redirect(request.args.get('next') or url_for('pages.dashboard'))
@auth_bp.route('/setup', methods=['GET', 'POST'])
def setup():
admin = User.query.filter_by(role='superadmin').first()
if admin and admin.password_hash:
flash('Setup is al voltooid.', 'info')
return redirect(url_for('auth.login'))
if request.method == 'POST':
data = request.get_json() if request.is_json else request.form
password = data.get('password', '')
confirm = data.get('confirm', '')
if len(password) < 12:
if request.is_json:
return jsonify({'error': 'Wachtwoord moet minstens 12 tekens zijn'}), 400
flash('Wachtwoord moet minstens 12 tekens zijn', 'error')
return render_template('setup.html')
if password != confirm:
if request.is_json:
return jsonify({'error': 'Wachtwoorden komen niet overeen'}), 400
flash('Wachtwoorden komen niet overeen', 'error')
return render_template('setup.html')
if not admin:
admin = User(email='admin@leerdoelen.local', role='superadmin',
first_name='Super', last_name='Admin')
db.session.add(admin)
admin.set_password(password)
db.session.commit()
if request.is_json:
return jsonify({'message': 'Setup voltooid', 'redirect': url_for('auth.login')})
flash('Setup voltooid! Je kan nu inloggen.', 'success')
return redirect(url_for('auth.login'))
return render_template('setup.html')
@auth_bp.route('/superadmin-login', methods=['POST'])
def superadmin_login():
"""Fallback login ENKEL voor de superadmin — niet voor gewone gebruikers."""
if current_user.is_authenticated:
return redirect(url_for('pages.dashboard'))
data = request.get_json() if request.is_json else request.form
email = data.get('email', '').strip().lower()
password = data.get('password', '')
user = User.query.filter_by(email=email, role='superadmin', is_active=True).first()
if not user or not user.check_password(password):
if request.is_json:
return jsonify({'error': 'Ongeldig e-mailadres of wachtwoord'}), 401
flash('Ongeldig e-mailadres of wachtwoord', 'error')
return redirect(url_for('auth.login'))
login_user(user, remember=False)
user.last_login = datetime.utcnow()
audit_log('login.success', 'auth', detail={'provider': 'superadmin'}, user_id=user.id)
db.session.commit()
if request.is_json:
return jsonify({'redirect': url_for('pages.dashboard')})
return redirect(url_for('pages.dashboard'))

60
backend/routes/pages.py Normal file
View File

@@ -0,0 +1,60 @@
from flask import Blueprint, render_template, redirect, url_for, current_app
from flask_login import login_required, current_user
pages_bp = Blueprint('pages', __name__)
def _org_name():
return current_app.config.get('ORG_NAME', 'GO! Scholengroep')
def _beheer_required(fn):
"""Decorator: alleen superadmin en scholengroep_ict."""
from functools import wraps
from flask import abort
@wraps(fn)
def wrapper(*args, **kwargs):
if not current_user.is_authenticated:
return redirect(url_for('auth.login'))
if not (current_user.is_superadmin or current_user.role == 'scholengroep_ict'):
abort(403)
return fn(*args, **kwargs)
return wrapper
@pages_bp.route('/')
def index():
if current_user.is_authenticated:
return redirect(url_for('pages.dashboard'))
return redirect(url_for('auth.login'))
@pages_bp.route('/dashboard')
@login_required
def dashboard():
org = _org_name()
if current_user.is_superadmin or current_user.role == 'scholengroep_ict':
return render_template('scholengroep_ict.html',
is_superadmin=current_user.is_superadmin,
org_name=org)
if current_user.role == 'school_ict':
return render_template('school_ict.html', org_name=org)
if current_user.role == 'director':
return render_template('directeur.html', org_name=org)
return render_template('leerkracht.html', org_name=org)
@pages_bp.route('/doelen-beheer')
@login_required
@_beheer_required
def doelen_beheer():
"""Aparte pagina voor het beheer van leerdoelen bestanden."""
return render_template('doelen_beheer.html',
is_superadmin=current_user.is_superadmin,
org_name=_org_name())
@pages_bp.route('/admin')
@login_required
def admin_page():
return redirect(url_for('pages.dashboard'))