Skip to content

Commit 0cec6a1

Browse files
committed
Add trip journaler
A trip journal is built from a sequence of GTFS realtime messages and contains a historical record of where the trips stopped and at what times. Ideally I would eventually clean up the code and test it and document it more. This code was moved from the subwaydata.nyc repo.
1 parent 65dace7 commit 0cec6a1

File tree

2 files changed

+488
-0
lines changed

2 files changed

+488
-0
lines changed

journal/journal.go

Lines changed: 296 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,296 @@
1+
// Package journal contains a tool for building trip journals.
2+
package journal
3+
4+
import (
5+
"fmt"
6+
"os"
7+
"path/filepath"
8+
"sort"
9+
"time"
10+
11+
"github.com/jamespfennell/gtfs"
12+
"github.com/jamespfennell/gtfs/extensions/nycttrips"
13+
)
14+
15+
type Journal struct {
16+
Trips []Trip
17+
// TODO: Metadata
18+
// TODO: filter on start_time
19+
// TODO: log an error if the trip passes the filter but was updated in the last update
20+
//if trip.NumScheduleChanges > 0 {
21+
// a += 1
22+
//}
23+
}
24+
25+
type Trip struct {
26+
TripUID string
27+
TripID string
28+
RouteID string
29+
DirectionID gtfs.DirectionID
30+
StartTime time.Time
31+
VehicleID string
32+
IsAssigned bool
33+
34+
StopTimes []StopTime
35+
36+
// Metadata follows
37+
LastObserved time.Time
38+
MarkedPast *time.Time
39+
NumUpdates int
40+
NumScheduleChanges int
41+
NumScheduleRewrites int
42+
}
43+
44+
type StopTime struct {
45+
StopID string
46+
ArrivalTime *time.Time
47+
DepartureTime *time.Time
48+
Track *string
49+
50+
LastObserved time.Time
51+
MarkedPast *time.Time
52+
}
53+
54+
// TODO: rename Source
55+
type GtfsrtSource interface {
56+
// TODO: change this to return (*gtfs.RealTime, error) and add logic in BuildJournal to be resilient to this
57+
Next() *gtfs.Realtime
58+
}
59+
60+
type DirectoryGtfsrtSource struct {
61+
baseDir string
62+
fileNames []string
63+
startLen int
64+
t time.Time
65+
}
66+
67+
func NewDirectoryGtfsrtSource(baseDir string) (*DirectoryGtfsrtSource, error) {
68+
files, err := os.ReadDir(baseDir)
69+
if err != nil {
70+
return nil, err
71+
}
72+
source := &DirectoryGtfsrtSource{
73+
baseDir: baseDir,
74+
t: time.Now(),
75+
}
76+
for _, file := range files {
77+
source.fileNames = append(source.fileNames, file.Name())
78+
}
79+
source.startLen = len(source.fileNames)
80+
sort.Strings(source.fileNames)
81+
return source, nil
82+
}
83+
84+
func (s *DirectoryGtfsrtSource) Next() *gtfs.Realtime {
85+
for {
86+
if len(s.fileNames) == 0 {
87+
return nil
88+
}
89+
filePath := filepath.Join(s.baseDir, s.fileNames[0])
90+
s.fileNames = s.fileNames[1:]
91+
b, err := os.ReadFile(filePath)
92+
if err != nil {
93+
// TODO: debug logging
94+
// log.Printf("Failed to read %s: %s", filePath, err)
95+
continue
96+
}
97+
extension := nycttrips.Extension(nycttrips.ExtensionOpts{
98+
FilterStaleUnassignedTrips: true,
99+
PreserveMTrainPlatformsInBushwick: false,
100+
})
101+
result, err := gtfs.ParseRealtime(b, &gtfs.ParseRealtimeOptions{
102+
Extension: extension,
103+
})
104+
if err != nil {
105+
// TODO: debug logging
106+
// log.Printf("Failed to parse %s as a GTFS Realtime message: %s", filePath, err)
107+
continue
108+
}
109+
if time.Since(s.t) >= time.Second {
110+
// TODO: debug logging
111+
// log.Printf("Processed %d/%d files\n", s.startLen-len(s.fileNames), s.startLen)
112+
s.t = time.Now()
113+
}
114+
return result
115+
}
116+
}
117+
118+
func BuildJournal(source GtfsrtSource, startTime, endTime time.Time) *Journal {
119+
trips := map[string]*Trip{}
120+
activeTrips := map[string]bool{}
121+
i := 0
122+
for feedMessage := source.Next(); feedMessage != nil; feedMessage = source.Next() {
123+
feedMessage := feedMessage
124+
createdAt := feedMessage.CreatedAt
125+
newActiveTrips := map[string]bool{}
126+
for _, tripUpdate := range feedMessage.Trips {
127+
startTime := tripUpdate.ID.StartDate.Add(tripUpdate.ID.StartTime)
128+
tripUID := fmt.Sprintf("%d%s", startTime.Unix(), tripUpdate.ID.ID[6:])
129+
if existingTrip, ok := trips[tripUID]; ok {
130+
existingTrip.update(&tripUpdate, createdAt)
131+
} else {
132+
trip := Trip{
133+
// One rewrite+change is expected at the start
134+
NumScheduleChanges: -1,
135+
NumScheduleRewrites: -1,
136+
}
137+
trip.update(&tripUpdate, createdAt)
138+
trips[tripUID] = &trip
139+
}
140+
newActiveTrips[tripUID] = true
141+
}
142+
for tripUID := range activeTrips {
143+
if newActiveTrips[tripUID] {
144+
continue
145+
}
146+
trips[tripUID].markPast(createdAt)
147+
}
148+
activeTrips = newActiveTrips
149+
i++
150+
}
151+
var tripIDs []string
152+
for tripID, trip := range trips {
153+
if trip.StartTime.Before(startTime) || endTime.Before(trip.StartTime) {
154+
continue
155+
}
156+
if !trip.IsAssigned {
157+
// TODO: debug logging
158+
// log.Printf("Skipping return of unassigned trip %s\n", trip.TripUID)
159+
continue
160+
}
161+
tripIDs = append(tripIDs, tripID)
162+
}
163+
sort.Strings(tripIDs)
164+
j := &Journal{}
165+
for _, tripID := range tripIDs {
166+
j.Trips = append(j.Trips, *trips[tripID])
167+
}
168+
return j
169+
}
170+
171+
func (trip *Trip) update(tripUpdate *gtfs.Trip, feedCreatedAt time.Time) {
172+
if trip.IsAssigned && tripUpdate.Vehicle == nil {
173+
// TODO: this seems to happen a lot, would be nice to figure out what's happening.
174+
// log.Printf("skipping unassigned update for assigned trip %s\n", trip.TripUID)
175+
return
176+
}
177+
startTime := tripUpdate.ID.StartDate.Add(tripUpdate.ID.StartTime)
178+
vehicle := tripUpdate.GetVehicle()
179+
180+
trip.TripUID = fmt.Sprintf("%d%s", startTime.Unix(), tripUpdate.ID.ID[6:])
181+
trip.TripID = tripUpdate.ID.ID
182+
trip.RouteID = tripUpdate.ID.RouteID
183+
trip.DirectionID = tripUpdate.ID.DirectionID
184+
trip.StartTime = tripUpdate.ID.StartDate.Add(tripUpdate.ID.StartTime)
185+
trip.VehicleID = vehicle.GetID().ID
186+
trip.IsAssigned = trip.IsAssigned || tripUpdate.Vehicle != nil
187+
188+
trip.LastObserved = feedCreatedAt
189+
trip.MarkedPast = nil
190+
trip.NumUpdates += 1
191+
192+
stopTimeUpdates := tripUpdate.StopTimeUpdates
193+
194+
p := createPartition(trip.StopTimes, stopTimeUpdates)
195+
196+
// Mark old stop times as past
197+
for i := range p.past {
198+
p.past[i].markPast(feedCreatedAt)
199+
}
200+
201+
// Update existing stop times
202+
for _, update := range p.updated {
203+
update.existing.update(update.update, feedCreatedAt)
204+
}
205+
206+
// Trim obsolete stop times (e.g. from a schedule change)
207+
trip.StopTimes = trip.StopTimes[:len(p.past)+len(p.updated)]
208+
if len(trip.StopTimes) == 0 {
209+
trip.NumScheduleRewrites += 1
210+
}
211+
212+
// Add new stop times
213+
for i := range p.new {
214+
stopTime := StopTime{}
215+
stopTime.update(&p.new[i], feedCreatedAt)
216+
trip.StopTimes = append(trip.StopTimes, stopTime)
217+
}
218+
if len(p.new) != 0 {
219+
trip.NumScheduleChanges += 1
220+
}
221+
}
222+
223+
func (trip *Trip) markPast(feedCreatedAt time.Time) {
224+
if trip.MarkedPast == nil {
225+
trip.MarkedPast = &feedCreatedAt
226+
}
227+
for i := 0; i < len(trip.StopTimes); i++ {
228+
trip.StopTimes[i].markPast(feedCreatedAt)
229+
}
230+
}
231+
232+
type updated struct {
233+
existing *StopTime
234+
update *gtfs.StopTimeUpdate
235+
}
236+
237+
type partition struct {
238+
past []StopTime
239+
updated []updated
240+
new []gtfs.StopTimeUpdate
241+
}
242+
243+
func createPartition(stopTimes []StopTime, updates []gtfs.StopTimeUpdate) partition {
244+
if len(updates) == 0 {
245+
return partition{
246+
past: stopTimes,
247+
}
248+
}
249+
var p partition
250+
251+
firstUpdatedStopID := *updates[0].StopID
252+
firstUpdatedStopTimeIndex := 0
253+
for i, stopTime := range stopTimes {
254+
if stopTime.StopID == firstUpdatedStopID {
255+
firstUpdatedStopTimeIndex = i
256+
break
257+
}
258+
}
259+
p.past = stopTimes[:firstUpdatedStopTimeIndex]
260+
261+
updateIndex := 0
262+
for i := range stopTimes[firstUpdatedStopTimeIndex:] {
263+
if updateIndex >= len(updates) {
264+
break
265+
}
266+
stopTime := &stopTimes[firstUpdatedStopTimeIndex+i]
267+
update := &updates[updateIndex]
268+
if stopTime.StopID != *update.StopID {
269+
break
270+
}
271+
p.updated = append(p.updated, updated{
272+
existing: stopTime,
273+
update: update,
274+
})
275+
updateIndex += 1
276+
}
277+
278+
p.new = updates[updateIndex:]
279+
280+
return p
281+
}
282+
283+
func (stopTime *StopTime) update(stopTimeUpdate *gtfs.StopTimeUpdate, feedCreatedAt time.Time) {
284+
stopTime.StopID = *stopTimeUpdate.StopID
285+
stopTime.ArrivalTime = stopTimeUpdate.GetArrival().Time
286+
stopTime.DepartureTime = stopTimeUpdate.GetDeparture().Time
287+
stopTime.Track = stopTimeUpdate.NyctTrack
288+
stopTime.LastObserved = feedCreatedAt
289+
stopTime.MarkedPast = nil
290+
}
291+
292+
func (stopTime *StopTime) markPast(feedCreatedAt time.Time) {
293+
if stopTime.MarkedPast == nil {
294+
stopTime.MarkedPast = &feedCreatedAt
295+
}
296+
}

0 commit comments

Comments
 (0)