from datetime import datetime, timedelta, date from django.db.models import Exists, Case, F, Min, OuterRef, Q, Value, When from django.utils.decorators import method_decorator from django.views.decorators.cache import cache_control from django.views.decorators.http import last_modified from django_filters.rest_framework import DjangoFilterBackend from rest_framework import viewsets from rest_framework.filters import OrderingFilter, SearchFilter from trainvel.api.serializers import AgencySerializer, CalendarDateSerializer, CalendarSerializer, \ FeedInfoSerializer, GTFSFeedSerializer, RouteSerializer, StationSerializer, StopSerializer, StopTimeSerializer, \ StopTimeUpdateSerializer, TransferSerializer, TripSerializer, TripUpdateSerializer from trainvel.core.models import Station from trainvel.gtfs.models import Agency, Calendar, CalendarDate, FeedInfo, GTFSFeed, Route, Stop, StopTime, \ StopTimeUpdate, Transfer, Trip, TripUpdate, PickupType CACHE_CONTROL = cache_control(max_age=30) LAST_MODIFIED = last_modified(lambda *args, **kwargs: GTFSFeed.objects.order_by('-last_modified').first().last_modified) LOOKUP_VALUE_REGEX = r"[\w.: |+-]+" class StationViewSet(viewsets.ReadOnlyModelViewSet): queryset = Station.objects.filter(is_suggestable=True) serializer_class = StationSerializer filter_backends = [DjangoFilterBackend, SearchFilter] filterset_fields = '__all__' search_fields = ['name', 'slug', 'info_de', 'info_en', 'info_es', 'info_fr', 'info_it', 'info_nb', 'info_nl', 'info_cs', 'info_da', 'info_hu', 'info_ja', 'info_ko', 'info_pl', 'info_pt', 'info_ru', 'info_sv', 'info_tr', 'info_zh', ] lookup_field = 'slug' lookup_value_regex = LOOKUP_VALUE_REGEX @method_decorator(name='list', decorator=[CACHE_CONTROL, LAST_MODIFIED]) @method_decorator(name='retrieve', decorator=[CACHE_CONTROL, LAST_MODIFIED]) class GTFSFeedViewSet(viewsets.ReadOnlyModelViewSet): queryset = GTFSFeed.objects.all() serializer_class = GTFSFeedSerializer filter_backends = [DjangoFilterBackend] filterset_fields = '__all__' lookup_value_regex = LOOKUP_VALUE_REGEX @method_decorator(name='list', decorator=[CACHE_CONTROL, LAST_MODIFIED]) @method_decorator(name='retrieve', decorator=[CACHE_CONTROL, LAST_MODIFIED]) class AgencyViewSet(viewsets.ReadOnlyModelViewSet): queryset = Agency.objects.all() serializer_class = AgencySerializer filter_backends = [DjangoFilterBackend] filterset_fields = '__all__' lookup_value_regex = LOOKUP_VALUE_REGEX @method_decorator(name='list', decorator=[CACHE_CONTROL, LAST_MODIFIED]) @method_decorator(name='retrieve', decorator=[CACHE_CONTROL, LAST_MODIFIED]) class StopViewSet(viewsets.ReadOnlyModelViewSet): queryset = Stop.objects.all() serializer_class = StopSerializer filter_backends = [DjangoFilterBackend, SearchFilter] filterset_fields = '__all__' search_fields = ['name',] lookup_value_regex = LOOKUP_VALUE_REGEX @method_decorator(name='list', decorator=[CACHE_CONTROL, LAST_MODIFIED]) @method_decorator(name='retrieve', decorator=[CACHE_CONTROL, LAST_MODIFIED]) class RouteViewSet(viewsets.ReadOnlyModelViewSet): queryset = Route.objects.all() serializer_class = RouteSerializer filter_backends = [DjangoFilterBackend] filterset_fields = '__all__' lookup_value_regex = LOOKUP_VALUE_REGEX @method_decorator(name='list', decorator=[CACHE_CONTROL, LAST_MODIFIED]) @method_decorator(name='retrieve', decorator=[CACHE_CONTROL, LAST_MODIFIED]) class TripViewSet(viewsets.ReadOnlyModelViewSet): queryset = Trip.objects.all() serializer_class = TripSerializer filter_backends = [DjangoFilterBackend] filterset_fields = '__all__' lookup_value_regex = LOOKUP_VALUE_REGEX @method_decorator(name='list', decorator=[CACHE_CONTROL, LAST_MODIFIED]) @method_decorator(name='retrieve', decorator=[CACHE_CONTROL, LAST_MODIFIED]) class StopTimeViewSet(viewsets.ReadOnlyModelViewSet): queryset = StopTime.objects.order_by('id').all() serializer_class = StopTimeSerializer filter_backends = [DjangoFilterBackend, OrderingFilter] filterset_fields = '__all__' ordering_fields = ['arrival_time', 'departure_time', 'stop_sequence', ] ordering = ['stop_sequence', ] lookup_value_regex = LOOKUP_VALUE_REGEX @method_decorator(name='list', decorator=[CACHE_CONTROL, LAST_MODIFIED]) @method_decorator(name='retrieve', decorator=[CACHE_CONTROL, LAST_MODIFIED]) class CalendarViewSet(viewsets.ReadOnlyModelViewSet): queryset = Calendar.objects.all() serializer_class = CalendarSerializer filter_backends = [DjangoFilterBackend] filterset_fields = '__all__' lookup_value_regex = LOOKUP_VALUE_REGEX @method_decorator(name='list', decorator=[CACHE_CONTROL, LAST_MODIFIED]) @method_decorator(name='retrieve', decorator=[CACHE_CONTROL, LAST_MODIFIED]) class CalendarDateViewSet(viewsets.ReadOnlyModelViewSet): queryset = CalendarDate.objects.all() serializer_class = CalendarDateSerializer filter_backends = [DjangoFilterBackend] filterset_fields = '__all__' lookup_value_regex = LOOKUP_VALUE_REGEX @method_decorator(name='list', decorator=[CACHE_CONTROL, LAST_MODIFIED]) @method_decorator(name='retrieve', decorator=[CACHE_CONTROL, LAST_MODIFIED]) class TransferViewSet(viewsets.ReadOnlyModelViewSet): queryset = Transfer.objects.all() serializer_class = TransferSerializer filter_backends = [DjangoFilterBackend] lookup_value_regex = LOOKUP_VALUE_REGEX @method_decorator(name='list', decorator=[CACHE_CONTROL, LAST_MODIFIED]) @method_decorator(name='retrieve', decorator=[CACHE_CONTROL, LAST_MODIFIED]) class FeedInfoViewSet(viewsets.ReadOnlyModelViewSet): queryset = FeedInfo.objects.all() serializer_class = FeedInfoSerializer filter_backends = [DjangoFilterBackend] filterset_fields = '__all__' lookup_value_regex = LOOKUP_VALUE_REGEX class TripUpdateViewSet(viewsets.ReadOnlyModelViewSet): queryset = TripUpdate.objects.all() serializer_class = TripUpdateSerializer filter_backends = [DjangoFilterBackend] filterset_fields = '__all__' lookup_value_regex = LOOKUP_VALUE_REGEX class StopTimeUpdateViewSet(viewsets.ReadOnlyModelViewSet): queryset = StopTimeUpdate.objects.all() serializer_class = StopTimeUpdateSerializer filter_backends = [DjangoFilterBackend] filterset_fields = '__all__' lookup_value_regex = LOOKUP_VALUE_REGEX class NextDeparturesViewSet(viewsets.ReadOnlyModelViewSet): queryset = StopTime.objects.none() serializer_class = StopTimeSerializer def get_queryset(self): now = datetime.now() station_slug = self.request.query_params.get('station_slug', None) query_date = date.fromisoformat(self.request.query_params.get('date', now.date().isoformat())) query_time = self.request.query_params.get('time', now.time().isoformat(timespec='seconds')) query_time = timedelta(seconds=int(query_time[:2]) * 3600 + int(query_time[3:5]) * 60 + (int(query_time[6:]) if len(query_time) > 6 else 0)) yesterday = query_date - timedelta(days=1) time_yesterday = query_time + timedelta(days=1) tomorrow = query_date + timedelta(days=1) stop_filter = Q(stop__location_type=0) if station_slug: station = Station.objects.get(is_suggestable=True, slug=station_slug) near_stops = station.get_near_stops() stop_filter = Q(stop_id__in=near_stops.values_list('id', flat=True)) excluded_agencies = ~Q(trip__route__gtfs_feed__excluded_agencies=F('trip__route__agency_id')) not_last_stop = ~Q(stop_sequence=StopTime.objects.filter(trip_id=OuterRef('trip_id')) .filter(pickup_type=PickupType.REGULAR) .order_by('-stop_sequence')[:1].values_list('stop_sequence')) trip_filter = Q() if self.request.query_params.get('route_name', None): trip_filter &= Q(trip__route_name__in=self.request.query_params.get('route_name').split(',')) if self.request.query_params.get('transport_type', None): trip_filter &= Q(trip__route__type__in=self.request.query_params.get('transport_type').split(',')) if self.request.query_params.get('long_distance', None) is not None: long_distance = str(self.request.query_params.get('long_distance')) == 'true' trip_filter &= Q(trip__long_distance=long_distance) def calendar_filter(d: date): return Q(trip__service_id__in=CalendarDate.objects.filter(date=d, exception_type=1) .values_list('service_id')) \ | Q(trip__service_id__in=Calendar.objects.filter( start_date__lte=d, end_date__gte=d, **{f"{d:%A}".lower(): True}) .filter(~Q(id__in=CalendarDate.objects.filter(date=d, exception_type=2) .values_list('service_id', flat=True))) .values_list('id')) def stop_time_update_qs(d: date): return StopTimeUpdate.objects.filter(trip_update__start_date=d) \ .exclude(departure_time=datetime.fromtimestamp(0)).filter(stop_time_id=OuterRef('pk')) def departure_time_real(d: date): return Case( When( condition=Exists(stop_time_update_qs(d)), then=F('departure_time') + stop_time_update_qs(d).values('departure_delay'), ), default=F('departure_time'), ) def canceled_filter(d: date): return Exists(stop_time_update_qs(d).filter(Q(schedule_relationship=1) | Q(schedule_relationship=3))) qs_today = StopTime.objects.filter(stop_filter) \ .filter(excluded_agencies) \ .filter(not_last_stop) \ .filter(trip_filter) \ .annotate(departure_time_real=departure_time_real(query_date)) \ .filter(departure_time_real__gte=query_time) \ .filter(Q(pickup_type=PickupType.REGULAR) | canceled_filter(query_date)) \ .filter(calendar_filter(query_date)) \ .annotate(departure_date=Value(query_date)) \ .annotate(departure_time_24h=F('departure_time')) qs_yesterday = StopTime.objects.filter(stop_filter) \ .filter(excluded_agencies) \ .filter(not_last_stop) \ .filter(trip_filter) \ .annotate(departure_time_real=departure_time_real(query_date)) \ .filter(departure_time_real__gte=time_yesterday) \ .filter(Q(pickup_type=PickupType.REGULAR) | canceled_filter(yesterday)) \ .filter(calendar_filter(yesterday)) \ .annotate(departure_date=Value(yesterday)) \ .annotate(departure_time_24h=F('departure_time') - timedelta(days=1)) qs_tomorrow = StopTime.objects.filter(stop_filter) \ .filter(excluded_agencies) \ .filter(not_last_stop) \ .filter(trip_filter) \ .annotate(departure_time_real=departure_time_real(query_date)) \ .filter(departure_time_real__gte=timedelta(0)) \ .filter(Q(pickup_type=PickupType.REGULAR) | canceled_filter(tomorrow)) \ .filter(calendar_filter(tomorrow)) \ .annotate(departure_date=Value(tomorrow)) \ .annotate(departure_time_24h=F('departure_time') + timedelta(days=1)) return qs_today.union(qs_yesterday).union(qs_tomorrow).order_by("departure_time_24h").all() class NextArrivalsViewSet(viewsets.ReadOnlyModelViewSet): queryset = StopTime.objects.none() serializer_class = StopTimeSerializer filter_backends = [DjangoFilterBackend] def get_queryset(self): now = datetime.now() station_slug = self.request.query_params.get('station_slug', None) query_date = date.fromisoformat(self.request.query_params.get('date', now.date().isoformat())) query_time = self.request.query_params.get('time', now.time().isoformat(timespec='seconds')) query_time = timedelta(seconds=int(query_time[:2]) * 3600 + int(query_time[3:5]) * 60 + (int(query_time[6:]) if len(query_time) > 6 else 0)) query_time -= timedelta(minutes=5) # Keep the last trains of the 5 previous minutes yesterday = query_date - timedelta(days=1) time_yesterday = query_time + timedelta(days=1) tomorrow = query_date + timedelta(days=1) stop_filter = Q(stop__location_type=0) if station_slug: station = Station.objects.get(is_suggestable=True, slug=station_slug) near_stops = station.get_near_stops() stop_filter = Q(stop_id__in=near_stops.values_list('id', flat=True)) excluded_agencies = ~Q(trip__route__gtfs_feed__excluded_agencies=F('trip__route__agency_id')) not_first_stop = ~Q(stop_sequence=StopTime.objects.filter(trip_id=OuterRef('trip_id')) .filter(drop_off_type=PickupType.REGULAR) .order_by('stop_sequence')[:1].values_list('stop_sequence')) trip_filter = Q() if self.request.query_params.get('route_name', None): trip_filter &= Q(trip__route_name__in=self.request.query_params.get('route_name').split(',')) if self.request.query_params.get('transport_type', None): trip_filter &= Q(trip__route__type__in=self.request.query_params.get('transport_type').split(',')) if self.request.query_params.get('long_distance', None) is not None: long_distance = str(self.request.query_params.get('long_distance')) == 'true' trip_filter &= Q(trip__long_distance=long_distance) def calendar_filter(d: date): return Q(trip__service_id__in=CalendarDate.objects.filter(date=d, exception_type=1) .values_list('service_id')) \ | Q(trip__service_id__in=Calendar.objects.filter( start_date__lte=d, end_date__gte=d, **{f"{d:%A}".lower(): True}) .filter(~Q(id__in=CalendarDate.objects.filter(date=d, exception_type=2) .values_list('service_id', flat=True))) .values_list('id')) def stop_time_update_qs(d: date): return StopTimeUpdate.objects.filter(trip_update__start_date=d) \ .exclude(arrival_time=datetime.fromtimestamp(0)).filter(stop_time_id=OuterRef('pk')) def arrival_time_real(d: date): return Case( When( condition=Exists(stop_time_update_qs(d)), then=F('arrival_time') + stop_time_update_qs(d).values('arrival_delay'), ), default=F('arrival_time'), ) def canceled_filter(d: date): return Exists(stop_time_update_qs(d).filter(Q(schedule_relationship=1) | Q(schedule_relationship=3))) qs_today = StopTime.objects.filter(stop_filter) \ .filter(excluded_agencies) \ .filter(not_first_stop) \ .filter(trip_filter) \ .annotate(arrival_time_real=arrival_time_real(query_date)) \ .filter(arrival_time_real__gte=query_time) \ .filter(Q(drop_off_type=PickupType.REGULAR) | canceled_filter(query_date)) \ .filter(calendar_filter(query_date)) \ .annotate(arrival_date=Value(query_date)) \ .annotate(arrival_time_24h=F('arrival_time')) qs_yesterday = StopTime.objects.filter(stop_filter) \ .filter(excluded_agencies) \ .filter(not_first_stop) \ .filter(trip_filter) \ .annotate(arrival_time_real=arrival_time_real(yesterday)) \ .filter(arrival_time_real__gte=time_yesterday) \ .filter(Q(drop_off_type=PickupType.REGULAR) | canceled_filter(yesterday)) \ .filter(calendar_filter(yesterday)) \ .annotate(arrival_date=Value(yesterday)) \ .annotate(arrival_time_24h=F('arrival_time') - timedelta(days=1)) qs_tomorrow = StopTime.objects.filter(stop_filter) \ .filter(excluded_agencies) \ .filter(not_first_stop) \ .filter(trip_filter) \ .annotate(arrival_time_real=arrival_time_real(tomorrow)) \ .filter(arrival_time_real__gte=timedelta(0)) \ .filter(Q(drop_off_type=PickupType.REGULAR) | canceled_filter(tomorrow)) \ .filter(calendar_filter(tomorrow)) \ .annotate(arrival_date=Value(tomorrow)) \ .annotate(arrival_time_24h=F('arrival_time') + timedelta(days=1)) return qs_today.union(qs_yesterday).union(qs_tomorrow).order_by("arrival_time_24h").all()