Skip to content

Commit 86bb6b1

Browse files
committed
timerdata
1 parent ece2beb commit 86bb6b1

26 files changed

+251
-120
lines changed

runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -135,17 +135,18 @@ public TimersImpl(StateNamespace namespace) {
135135

136136
@Override
137137
public void setTimer(Instant timestamp, TimeDomain timeDomain) {
138-
timerInternals.setTimer(TimerData.of(namespace, timestamp, timestamp, timeDomain));
138+
timerInternals.setTimer(TimerData.of(namespace, timestamp, timestamp, timeDomain, false));
139139
}
140140

141141
@Override
142142
public void setTimer(Instant timestamp, Instant outputTimestamp, TimeDomain timeDomain) {
143-
timerInternals.setTimer(TimerData.of(namespace, timestamp, outputTimestamp, timeDomain));
143+
timerInternals.setTimer(
144+
TimerData.of(namespace, timestamp, outputTimestamp, timeDomain, false));
144145
}
145146

146147
@Override
147148
public void deleteTimer(Instant timestamp, TimeDomain timeDomain) {
148-
timerInternals.deleteTimer(TimerData.of(namespace, timestamp, timestamp, timeDomain));
149+
timerInternals.deleteTimer(TimerData.of(namespace, timestamp, timestamp, timeDomain, false));
149150
}
150151

151152
@Override

runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -600,7 +600,7 @@ public String getErrorContext() {
600600
// Set a timer to continue processing this element.
601601
timerInternals.setTimer(
602602
TimerInternals.TimerData.of(
603-
stateNamespace, wakeupTime, wakeupTime, TimeDomain.PROCESSING_TIME));
603+
stateNamespace, wakeupTime, wakeupTime, TimeDomain.PROCESSING_TIME, false));
604604
}
605605

606606
private DoFnInvoker.ArgumentProvider<InputT, OutputT> wrapOptionsAsSetup(

runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java

Lines changed: 39 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,8 @@ abstract class TimerData implements Comparable<TimerData> {
188188

189189
public abstract boolean getDeleted();
190190

191+
public abstract boolean causedByDrain();
192+
191193
// When adding a new field, make sure to add it to the compareTo() method.
192194

193195
/** Construct a {@link TimerData} for the given parameters. */
@@ -196,9 +198,10 @@ public static TimerData of(
196198
StateNamespace namespace,
197199
Instant timestamp,
198200
Instant outputTimestamp,
199-
TimeDomain domain) {
201+
TimeDomain domain,
202+
boolean causedByDrain) {
200203
return new AutoValue_TimerInternals_TimerData(
201-
timerId, "", namespace, timestamp, outputTimestamp, domain, false);
204+
timerId, "", namespace, timestamp, outputTimestamp, domain, false, causedByDrain);
202205
}
203206

204207
/**
@@ -211,19 +214,41 @@ public static TimerData of(
211214
StateNamespace namespace,
212215
Instant timestamp,
213216
Instant outputTimestamp,
214-
TimeDomain domain) {
217+
TimeDomain domain,
218+
boolean causedByDrain) {
215219
return new AutoValue_TimerInternals_TimerData(
216-
timerId, timerFamilyId, namespace, timestamp, outputTimestamp, domain, false);
220+
timerId,
221+
timerFamilyId,
222+
namespace,
223+
timestamp,
224+
outputTimestamp,
225+
domain,
226+
false,
227+
causedByDrain);
228+
}
229+
230+
public static TimerData of(
231+
String timerId,
232+
String timerFamilyId,
233+
StateNamespace namespace,
234+
Instant timestamp,
235+
Instant outputTimestamp,
236+
TimeDomain domain) {
237+
return of(timerId, timerFamilyId, namespace, timestamp, outputTimestamp, domain, false);
217238
}
218239

219240
/**
220241
* Construct a {@link TimerData} for the given parameters except for timer ID. Timer ID is
221242
* deterministically generated from the {@code timestamp} and {@code domain}.
222243
*/
223244
public static TimerData of(
224-
StateNamespace namespace, Instant timestamp, Instant outputTimestamp, TimeDomain domain) {
245+
StateNamespace namespace,
246+
Instant timestamp,
247+
Instant outputTimestamp,
248+
TimeDomain domain,
249+
boolean causedByDrain) {
225250
String timerId = String.valueOf(domain.ordinal()) + ':' + timestamp.getMillis();
226-
return of(timerId, namespace, timestamp, outputTimestamp, domain);
251+
return of(timerId, namespace, timestamp, outputTimestamp, domain, causedByDrain);
227252
}
228253

229254
public TimerData deleted() {
@@ -234,7 +259,8 @@ public TimerData deleted() {
234259
getTimestamp(),
235260
getOutputTimestamp(),
236261
getDomain(),
237-
true);
262+
true,
263+
causedByDrain());
238264
}
239265

240266
/**
@@ -272,7 +298,9 @@ public String stringKey() {
272298
+ "/"
273299
+ getTimerFamilyId()
274300
+ ":"
275-
+ getTimerId();
301+
+ getTimerId()
302+
+ ":"
303+
+ causedByDrain();
276304
}
277305
}
278306

@@ -309,7 +337,8 @@ public TimerData decode(InputStream inStream) throws CoderException, IOException
309337
Instant timestamp = INSTANT_CODER.decode(inStream);
310338
Instant outputTimestamp = INSTANT_CODER.decode(inStream);
311339
TimeDomain domain = TimeDomain.valueOf(STRING_CODER.decode(inStream));
312-
return TimerData.of(timerId, timerFamilyId, namespace, timestamp, outputTimestamp, domain);
340+
return TimerData.of(
341+
timerId, timerFamilyId, namespace, timestamp, outputTimestamp, domain, false);
313342
}
314343

315344
@Override
@@ -355,7 +384,7 @@ public TimerData decode(InputStream inStream) throws CoderException, IOException
355384
StateNamespaces.fromString(STRING_CODER.decode(inStream), windowCoder);
356385
Instant timestamp = INSTANT_CODER.decode(inStream);
357386
TimeDomain domain = TimeDomain.valueOf(STRING_CODER.decode(inStream));
358-
return TimerData.of(timerId, namespace, timestamp, timestamp, domain);
387+
return TimerData.of(timerId, namespace, timestamp, timestamp, domain, false);
359388
}
360389

361390
@Override

runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,9 @@ public class InMemoryTimerInternalsTest {
4141
public void testFiringEventTimers() throws Exception {
4242
InMemoryTimerInternals underTest = new InMemoryTimerInternals();
4343
TimerData eventTimer1 =
44-
TimerData.of(ID1, NS1, new Instant(19), new Instant(19), TimeDomain.EVENT_TIME);
44+
TimerData.of(ID1, NS1, new Instant(19), new Instant(19), TimeDomain.EVENT_TIME, false);
4545
TimerData eventTimer2 =
46-
TimerData.of(ID2, NS1, new Instant(29), new Instant(29), TimeDomain.EVENT_TIME);
46+
TimerData.of(ID2, NS1, new Instant(29), new Instant(29), TimeDomain.EVENT_TIME, false);
4747

4848
underTest.setTimer(eventTimer1);
4949
underTest.setTimer(eventTimer2);
@@ -111,9 +111,9 @@ public void testDeletionById() throws Exception {
111111
public void testFiringProcessingTimeTimers() throws Exception {
112112
InMemoryTimerInternals underTest = new InMemoryTimerInternals();
113113
TimerData processingTime1 =
114-
TimerData.of(NS1, new Instant(19), new Instant(19), TimeDomain.PROCESSING_TIME);
114+
TimerData.of(NS1, new Instant(19), new Instant(19), TimeDomain.PROCESSING_TIME, false);
115115
TimerData processingTime2 =
116-
TimerData.of(NS1, new Instant(29), new Instant(29), TimeDomain.PROCESSING_TIME);
116+
TimerData.of(NS1, new Instant(29), new Instant(29), TimeDomain.PROCESSING_TIME, false);
117117

118118
underTest.setTimer(processingTime1);
119119
underTest.setTimer(processingTime2);
@@ -142,19 +142,19 @@ public void testFiringProcessingTimeTimers() throws Exception {
142142
public void testTimerOrdering() throws Exception {
143143
InMemoryTimerInternals underTest = new InMemoryTimerInternals();
144144
TimerData eventTime1 =
145-
TimerData.of(NS1, new Instant(19), new Instant(19), TimeDomain.EVENT_TIME);
145+
TimerData.of(NS1, new Instant(19), new Instant(19), TimeDomain.EVENT_TIME, false);
146146
TimerData processingTime1 =
147-
TimerData.of(NS1, new Instant(19), new Instant(19), TimeDomain.PROCESSING_TIME);
147+
TimerData.of(NS1, new Instant(19), new Instant(19), TimeDomain.PROCESSING_TIME, false);
148148
TimerData synchronizedProcessingTime1 =
149149
TimerData.of(
150-
NS1, new Instant(19), new Instant(19), TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
150+
NS1, new Instant(19), new Instant(19), TimeDomain.SYNCHRONIZED_PROCESSING_TIME, false);
151151
TimerData eventTime2 =
152-
TimerData.of(NS1, new Instant(29), new Instant(29), TimeDomain.EVENT_TIME);
152+
TimerData.of(NS1, new Instant(29), new Instant(29), TimeDomain.EVENT_TIME, false);
153153
TimerData processingTime2 =
154-
TimerData.of(NS1, new Instant(29), new Instant(29), TimeDomain.PROCESSING_TIME);
154+
TimerData.of(NS1, new Instant(29), new Instant(29), TimeDomain.PROCESSING_TIME, false);
155155
TimerData synchronizedProcessingTime2 =
156156
TimerData.of(
157-
NS1, new Instant(29), new Instant(29), TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
157+
NS1, new Instant(29), new Instant(29), TimeDomain.SYNCHRONIZED_PROCESSING_TIME, false);
158158

159159
underTest.setTimer(processingTime1);
160160
underTest.setTimer(eventTime1);
@@ -188,9 +188,9 @@ public void testTimerOrdering() throws Exception {
188188
public void testDeduplicate() throws Exception {
189189
InMemoryTimerInternals underTest = new InMemoryTimerInternals();
190190
TimerData eventTime =
191-
TimerData.of(NS1, new Instant(19), new Instant(19), TimeDomain.EVENT_TIME);
191+
TimerData.of(NS1, new Instant(19), new Instant(19), TimeDomain.EVENT_TIME, false);
192192
TimerData processingTime =
193-
TimerData.of(NS1, new Instant(19), new Instant(19), TimeDomain.PROCESSING_TIME);
193+
TimerData.of(NS1, new Instant(19), new Instant(19), TimeDomain.PROCESSING_TIME, false);
194194
underTest.setTimer(eventTime);
195195
underTest.setTimer(eventTime);
196196
underTest.setTimer(processingTime);

runners/core-java/src/test/java/org/apache/beam/runners/core/KeyedWorkItemCoderTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ public void testEncodeDecodeEqual() throws Exception {
4848
StateNamespaces.global(),
4949
new Instant(500L),
5050
new Instant(500L),
51-
TimeDomain.EVENT_TIME));
51+
TimeDomain.EVENT_TIME,
52+
false));
5253
Iterable<WindowedValue<Integer>> elements =
5354
ImmutableList.of(
5455
WindowedValues.valueInGlobalWindow(1),

runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -574,7 +574,11 @@ public void fireTimer(W window, Instant timestamp, TimeDomain domain) throws Exc
574574
ArrayList<TimerData> timers = new ArrayList<>(1);
575575
timers.add(
576576
TimerData.of(
577-
StateNamespaces.window(windowFn.windowCoder(), window), timestamp, timestamp, domain));
577+
StateNamespaces.window(windowFn.windowCoder(), window),
578+
timestamp,
579+
timestamp,
580+
domain,
581+
false));
578582
runner.onTimers(timers);
579583
runner.persist();
580584
}
@@ -588,7 +592,8 @@ public void fireTimers(W window, TimestampedValue<TimeDomain>... timers) throws
588592
StateNamespaces.window(windowFn.windowCoder(), window),
589593
timer.getTimestamp(),
590594
timer.getTimestamp(),
591-
timer.getValue()));
595+
timer.getValue(),
596+
false));
592597
}
593598
runner.onTimers(timerData);
594599
runner.persist();

runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -701,7 +701,8 @@ public void onTimer(OnTimerContext context) {
701701
StateNamespaces.window(windowCoder, (W) context.window()),
702702
context.fireTimestamp(),
703703
context.timestamp(),
704-
context.timeDomain()));
704+
context.timeDomain(),
705+
false));
705706
}
706707
}
707708

runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,8 @@ public void testOnTimerCalled() {
317317
StateNamespaces.window(IntervalWindow.getCoder(), window),
318318
timestamp,
319319
timestamp,
320-
TimeDomain.EVENT_TIME)));
320+
TimeDomain.EVENT_TIME,
321+
false)));
321322
}
322323

323324
private static class TestDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT> {
@@ -359,7 +360,8 @@ public <KeyT> void onTimer(
359360
StateNamespaces.window(IntervalWindow.getCoder(), (IntervalWindow) window),
360361
timestamp,
361362
outputTimestamp,
362-
timeDomain));
363+
timeDomain,
364+
false));
363365
}
364366

365367
@Override

runners/core-java/src/test/java/org/apache/beam/runners/core/TimerInternalsTest.java

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,8 @@ public void testTimerDataCoder() throws Exception {
4646
StateNamespaces.global(),
4747
new Instant(0),
4848
new Instant(0),
49-
TimeDomain.EVENT_TIME));
49+
TimeDomain.EVENT_TIME,
50+
false));
5051

5152
Coder<IntervalWindow> windowCoder = IntervalWindow.getCoder();
5253
CoderProperties.coderDecodeEncodeEqual(
@@ -57,7 +58,8 @@ public void testTimerDataCoder() throws Exception {
5758
windowCoder, new IntervalWindow(new Instant(0), new Instant(100))),
5859
new Instant(99),
5960
new Instant(99),
60-
TimeDomain.PROCESSING_TIME));
61+
TimeDomain.PROCESSING_TIME,
62+
false));
6163
}
6264

6365
@Test
@@ -69,12 +71,13 @@ public void testCoderIsSerializableWithWellKnownCoderType() {
6971
public void testCompareEqual() {
7072
Instant timestamp = new Instant(100);
7173
StateNamespace namespace = StateNamespaces.global();
72-
TimerData timer = TimerData.of("id", namespace, timestamp, timestamp, TimeDomain.EVENT_TIME);
74+
TimerData timer =
75+
TimerData.of("id", namespace, timestamp, timestamp, TimeDomain.EVENT_TIME, false);
7376

7477
assertThat(
7578
timer,
7679
comparesEqualTo(
77-
TimerData.of("id", namespace, timestamp, timestamp, TimeDomain.EVENT_TIME)));
80+
TimerData.of("id", namespace, timestamp, timestamp, TimeDomain.EVENT_TIME, false)));
7881
}
7982

8083
@Test
@@ -84,9 +87,9 @@ public void testCompareByTimestamp() {
8487
StateNamespace namespace = StateNamespaces.global();
8588

8689
TimerData firstTimer =
87-
TimerData.of(namespace, firstTimestamp, firstTimestamp, TimeDomain.EVENT_TIME);
90+
TimerData.of(namespace, firstTimestamp, firstTimestamp, TimeDomain.EVENT_TIME, false);
8891
TimerData secondTimer =
89-
TimerData.of(namespace, secondTimestamp, secondTimestamp, TimeDomain.EVENT_TIME);
92+
TimerData.of(namespace, secondTimestamp, secondTimestamp, TimeDomain.EVENT_TIME, false);
9093

9194
assertThat(firstTimer, lessThan(secondTimer));
9295
}
@@ -96,10 +99,13 @@ public void testCompareByDomain() {
9699
Instant timestamp = new Instant(100);
97100
StateNamespace namespace = StateNamespaces.global();
98101

99-
TimerData eventTimer = TimerData.of(namespace, timestamp, timestamp, TimeDomain.EVENT_TIME);
100-
TimerData procTimer = TimerData.of(namespace, timestamp, timestamp, TimeDomain.PROCESSING_TIME);
102+
TimerData eventTimer =
103+
TimerData.of(namespace, timestamp, timestamp, TimeDomain.EVENT_TIME, false);
104+
TimerData procTimer =
105+
TimerData.of(namespace, timestamp, timestamp, TimeDomain.PROCESSING_TIME, false);
101106
TimerData synchronizedProcTimer =
102-
TimerData.of(namespace, timestamp, timestamp, TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
107+
TimerData.of(
108+
namespace, timestamp, timestamp, TimeDomain.SYNCHRONIZED_PROCESSING_TIME, false);
103109

104110
assertThat(eventTimer, lessThan(procTimer));
105111
assertThat(eventTimer, lessThan(synchronizedProcTimer));
@@ -117,9 +123,9 @@ public void testCompareByNamespace() {
117123
StateNamespace secondWindowNs = StateNamespaces.window(windowCoder, secondWindow);
118124

119125
TimerData secondEventTime =
120-
TimerData.of(firstWindowNs, timestamp, timestamp, TimeDomain.EVENT_TIME);
126+
TimerData.of(firstWindowNs, timestamp, timestamp, TimeDomain.EVENT_TIME, false);
121127
TimerData thirdEventTime =
122-
TimerData.of(secondWindowNs, timestamp, timestamp, TimeDomain.EVENT_TIME);
128+
TimerData.of(secondWindowNs, timestamp, timestamp, TimeDomain.EVENT_TIME, false);
123129

124130
assertThat(secondEventTime, lessThan(thirdEventTime));
125131
}
@@ -130,9 +136,9 @@ public void testCompareByTimerId() {
130136
StateNamespace namespace = StateNamespaces.global();
131137

132138
TimerData id0Timer =
133-
TimerData.of("id0", namespace, timestamp, timestamp, TimeDomain.EVENT_TIME);
139+
TimerData.of("id0", namespace, timestamp, timestamp, TimeDomain.EVENT_TIME, false);
134140
TimerData id1Timer =
135-
TimerData.of("id1", namespace, timestamp, timestamp, TimeDomain.EVENT_TIME);
141+
TimerData.of("id1", namespace, timestamp, timestamp, TimeDomain.EVENT_TIME, false);
136142

137143
assertThat(id0Timer, lessThan(id1Timer));
138144
}

runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -400,17 +400,18 @@ public TestTimers(StateNamespace namespace) {
400400

401401
@Override
402402
public void setTimer(Instant timestamp, TimeDomain timeDomain) {
403-
timerInternals.setTimer(TimerData.of(namespace, timestamp, timestamp, timeDomain));
403+
timerInternals.setTimer(TimerData.of(namespace, timestamp, timestamp, timeDomain, false));
404404
}
405405

406406
@Override
407407
public void setTimer(Instant timestamp, Instant outputTimestamp, TimeDomain timeDomain) {
408-
timerInternals.setTimer(TimerData.of(namespace, timestamp, outputTimestamp, timeDomain));
408+
timerInternals.setTimer(
409+
TimerData.of(namespace, timestamp, outputTimestamp, timeDomain, false));
409410
}
410411

411412
@Override
412413
public void deleteTimer(Instant timestamp, TimeDomain timeDomain) {
413-
timerInternals.deleteTimer(TimerData.of(namespace, timestamp, timestamp, timeDomain));
414+
timerInternals.deleteTimer(TimerData.of(namespace, timestamp, timestamp, timeDomain, false));
414415
}
415416

416417
@Override

0 commit comments

Comments
 (0)