6
6
using System . Threading ;
7
7
using System . Threading . Tasks ;
8
8
using k8s . Autorest ;
9
+ using k8s . Models ;
9
10
using Octopus . Diagnostics ;
10
11
using Octopus . Tentacle . Contracts ;
11
12
@@ -21,39 +22,65 @@ class KubernetesPodLogService : KubernetesService, IKubernetesPodLogService
21
22
readonly IKubernetesPodMonitor podMonitor ;
22
23
readonly ITentacleScriptLogProvider scriptLogProvider ;
23
24
readonly IScriptPodSinceTimeStore scriptPodSinceTimeStore ;
25
+ readonly IKubernetesEventService eventService ;
24
26
25
- public KubernetesPodLogService ( IKubernetesClientConfigProvider configProvider , IKubernetesPodMonitor podMonitor , ITentacleScriptLogProvider scriptLogProvider , IScriptPodSinceTimeStore scriptPodSinceTimeStore , ISystemLog log )
27
+ public KubernetesPodLogService (
28
+ IKubernetesClientConfigProvider configProvider ,
29
+ IKubernetesPodMonitor podMonitor ,
30
+ ITentacleScriptLogProvider scriptLogProvider ,
31
+ IScriptPodSinceTimeStore scriptPodSinceTimeStore ,
32
+ IKubernetesEventService eventService ,
33
+ ISystemLog log )
26
34
: base ( configProvider , log )
27
35
{
28
36
this . podMonitor = podMonitor ;
29
37
this . scriptLogProvider = scriptLogProvider ;
30
38
this . scriptPodSinceTimeStore = scriptPodSinceTimeStore ;
39
+ this . eventService = eventService ;
31
40
}
32
41
33
42
public async Task < ( IReadOnlyCollection < ProcessOutput > Outputs , long NextSequenceNumber ) > GetLogs ( ScriptTicket scriptTicket , long lastLogSequence , CancellationToken cancellationToken )
34
43
{
35
44
var tentacleScriptLog = scriptLogProvider . GetOrCreate ( scriptTicket ) ;
36
45
var podName = scriptTicket . ToKubernetesScriptPodName ( ) ;
37
46
38
- var podLogs = await GetPodLogs ( ) ;
47
+ //we start both tasks now so we can overlap the API calls
48
+ var podLogsTask = GetPodLogs ( ) ;
49
+ var podEventsTask = GetPodEvents ( scriptTicket , podName , cancellationToken ) ;
50
+
51
+ var podLogs = await podLogsTask ;
39
52
if ( podLogs . Outputs . Any ( ) )
40
53
{
41
54
var nextSinceTime = podLogs . Outputs . Max ( o => o . Occurred ) ;
42
- scriptPodSinceTimeStore . UpdateSinceTime ( scriptTicket , nextSinceTime ) ;
43
-
55
+ scriptPodSinceTimeStore . UpdatePodLogsSinceTime ( scriptTicket , nextSinceTime ) ;
56
+
44
57
//We can use our EOS marker to detect completion quicker than the Pod status
45
58
if ( podLogs . ExitCode != null )
46
59
podMonitor . MarkAsCompleted ( scriptTicket , podLogs . ExitCode . Value ) ;
47
60
}
48
61
49
62
//We are making the assumption that the clock on the Tentacle Pod is in sync with API Server
50
63
var tentacleLogs = tentacleScriptLog . PopLogs ( ) ;
51
- var combinedLogs = podLogs . Outputs . Concat ( tentacleLogs ) . OrderBy ( o => o . Occurred ) . ToList ( ) ;
64
+ var podEventLogs = await podEventsTask ;
65
+
66
+ var combinedLogs = podLogs
67
+ . Outputs
68
+ . Concat ( tentacleLogs )
69
+ . Concat ( podEventLogs )
70
+ . OrderBy ( o => o . Occurred )
71
+ . SelectMany ( o => o switch
72
+ {
73
+ //if this is a wrapped output, expand it in place
74
+ WrappedProcessOutput wrappedOutput => wrappedOutput . Expand ( ) ,
75
+ _ => new [ ] { o }
76
+ } )
77
+ . ToList ( ) ;
78
+
52
79
return ( combinedLogs , podLogs . NextSequenceNumber ) ;
53
80
54
81
async Task < ( IReadOnlyCollection < ProcessOutput > Outputs , long NextSequenceNumber , int ? ExitCode ) > GetPodLogs ( )
55
82
{
56
- var sinceTime = scriptPodSinceTimeStore . GetSinceTime ( scriptTicket ) ;
83
+ var sinceTime = scriptPodSinceTimeStore . GetPodLogsSinceTime ( scriptTicket ) ;
57
84
try
58
85
{
59
86
return await GetPodLogsWithSinceTime ( sinceTime ) ;
@@ -63,7 +90,7 @@ public KubernetesPodLogService(IKubernetesClientConfigProvider configProvider, I
63
90
var message = $ "Unexpected Pod log line numbers found with sinceTime='{ sinceTime } ', loading all logs";
64
91
tentacleScriptLog . Verbose ( message ) ;
65
92
Log . Warn ( ex , message ) ;
66
-
93
+
67
94
//If we somehow come across weird/missing line numbers, try load the whole Pod logs to see if that helps
68
95
return await GetPodLogsWithSinceTime ( null ) ;
69
96
}
@@ -77,13 +104,64 @@ public KubernetesPodLogService(IKubernetesClientConfigProvider configProvider, I
77
104
78
105
async Task < ( IReadOnlyCollection < ProcessOutput > Outputs , long NextSequenceNumber , int ? ExitCode ) > ReadPodLogsFromStream ( Stream stream )
79
106
{
80
- using ( var reader = new StreamReader ( stream ) )
81
- {
82
- return await PodLogReader . ReadPodLogs ( lastLogSequence , reader ) ;
83
- }
107
+ using var reader = new StreamReader ( stream ) ;
108
+ return await PodLogReader . ReadPodLogs ( lastLogSequence , reader ) ;
84
109
}
85
110
}
86
111
112
+ async Task < IEnumerable < ProcessOutput > > GetPodEvents ( ScriptTicket scriptTicket , string podName , CancellationToken cancellationToken )
113
+ {
114
+ //if we don't want to write pod events to the task log, don't do anything
115
+ if ( KubernetesConfig . DisablePodEventsInTaskLog )
116
+ {
117
+ return Array . Empty < ProcessOutput > ( ) ;
118
+ }
119
+
120
+
121
+ var sinceTime = scriptPodSinceTimeStore . GetPodEventsSinceTime ( scriptTicket ) ;
122
+
123
+ var allEvents = await eventService . FetchAllEventsAsync ( KubernetesConfig . Namespace , podName , cancellationToken ) ;
124
+ if ( allEvents is null )
125
+ {
126
+ return Array . Empty < ProcessOutput > ( ) ;
127
+ }
128
+
129
+ var relevantEvents = allEvents . Items
130
+ . Select ( e => ( Event : e , Occurred : EventHelpers . GetLatestTimestampInEvent ( e ) ) )
131
+ . Where ( x => x . Occurred . HasValue )
132
+ . Select ( x => ( x . Event , Occurred : new DateTimeOffset ( x . Occurred ! . Value , TimeSpan . Zero ) ) )
133
+ . OrderBy ( x => x . Occurred )
134
+ . SkipWhile ( e => e . Occurred <= sinceTime ) ;
135
+
136
+ var events = relevantEvents . Select ( ( x ) =>
137
+ {
138
+ var ( ev , occurred ) = x ;
139
+
140
+ var formattedMessage = $ "[POD EVENT] { ev . Reason } | { ev . Message } (Count: { ev . Count ?? 1 } )";
141
+
142
+ if ( ev . IsWarning ( ) )
143
+ return new WrappedProcessOutput ( ProcessOutputSource . StdOut , formattedMessage , occurred , "warning" ) ;
144
+
145
+ //if we are pulling a container, show it as a "wait"
146
+ if ( ev . IsPullingReason ( ) )
147
+ return new WrappedProcessOutput ( ProcessOutputSource . StdOut , formattedMessage , occurred , "wait" ) ;
148
+
149
+ //if this is a Pulled event, and we had a Pulling event with the same path (i.e. the same container), then show this as "wait"
150
+ if ( ev . IsPulledReason ( ) && allEvents . Items . Any ( e => e . IsPullingReason ( ) && ev . InvolvedObject . FieldPath == e . InvolvedObject . FieldPath ) )
151
+ return new WrappedProcessOutput ( ProcessOutputSource . StdOut , formattedMessage , occurred , "wait" ) ;
152
+
153
+ return new ProcessOutput ( ProcessOutputSource . Debug , formattedMessage , occurred ) ;
154
+ } )
155
+ . ToArray ( ) ;
156
+
157
+ if ( events . Any ( ) )
158
+ {
159
+ //update the events since time, so we don't get duplicate events
160
+ scriptPodSinceTimeStore . UpdatePodEventsSinceTime ( scriptTicket , events . Max ( o => o . Occurred ) ) ;
161
+ }
162
+
163
+ return events ;
164
+ }
87
165
88
166
async Task < Stream ? > GetLogStream ( string podName , DateTimeOffset ? sinceTime , CancellationToken cancellationToken )
89
167
{
@@ -107,5 +185,36 @@ public KubernetesPodLogService(IKubernetesClientConfigProvider configProvider, I
107
185
}
108
186
}
109
187
}
188
+
189
+ class WrappedProcessOutput : ProcessOutput
190
+ {
191
+ static readonly TimeSpan OneTick = TimeSpan . FromTicks ( 1 ) ;
192
+ readonly string wrapper ;
193
+
194
+ public WrappedProcessOutput ( ProcessOutputSource source , string text , DateTimeOffset occurred , string wrapper )
195
+ : base ( source , text , occurred )
196
+ {
197
+ this . wrapper = wrapper ;
198
+ }
199
+
200
+ public IEnumerable < ProcessOutput > Expand ( )
201
+ {
202
+ return new [ ]
203
+ {
204
+ //we add the service messages one tick before and after so they are correctly ordered
205
+ new ProcessOutput ( ProcessOutputSource . StdOut , $ "##octopus[stdout-{ wrapper } ]", Occurred . Subtract ( OneTick ) ) ,
206
+ new ProcessOutput ( Source , Text , Occurred ) ,
207
+ new ProcessOutput ( ProcessOutputSource . StdOut , "##octopus[stdout-default]" , Occurred . Add ( OneTick ) ) ,
208
+ } ;
209
+ }
210
+ }
211
+ }
212
+
213
+ public static class EventExtensions
214
+ {
215
+ public static bool IsPullingReason ( this Corev1Event @event ) => @event . Reason . Equals ( "Pulling" , StringComparison . OrdinalIgnoreCase ) ;
216
+ public static bool IsPulledReason ( this Corev1Event @event ) => @event . Reason . Equals ( "Pulled" , StringComparison . OrdinalIgnoreCase ) ;
217
+ public static bool IsWarning ( this Corev1Event @event ) => @event . Type . Equals ( "Warning" , StringComparison . OrdinalIgnoreCase ) ;
218
+
110
219
}
111
220
}
0 commit comments