Disrutpor User Case

如何解决前置Handler批处理造成后置Handler阻塞问题

举例,生产消费链条: P -> C1 -> C2

假如P生产者一次性生产了1000万条数据,在默认情况下,C1必须消费完这1000万条数据后,C2才能继续接着处理这1000万条数据,在实际情况中,这是不合理的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private void processEvents() {
T event = null;
long nextSequence = sequence.get() + 1L;

while (true) {
try {
final long availableSequence = sequenceBarrier.waitFor(nextSequence);
if (batchStartAware != null && availableSequence >= nextSequence) {
batchStartAware.onBatchStart(availableSequence - nextSequence + 1);
}

while (nextSequence <= availableSequence) {
event = dataProvider.get(nextSequence);
eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
nextSequence++;
}

sequence.set(availableSequence);
} catch (final TimeoutException e) {
notifyTimeout(sequence.get());
} catch (final AlertException ex) {
if (running.get() != RUNNING) {
break;
}
} catch (final Throwable ex) {
exceptionHandler.handleEventException(ex, nextSequence, event);
sequence.set(nextSequence);
nextSequence++;
}
}
}

在这段代码中,只有当前一个消费者调用了 sequence.set(availableSequence) 后,下一个消费者才能接着消费数据。

The call to sequence.set() is placed at the end of the callback to provide the maximum possible throughput.

The call to Sequence.set() is reasonably cheap but not as fast as a standard variable assignment as it requires atomicity and enforces ordering. However the use case that you mention is a common one therefore we have a specific type of event handler for it.

If you want to have an event handler that updates the sequence more frequently that the default, then you should use the SequenceReportingEventHandler.

1
2
3
4
5
6
7
8
9
10
11
12
13
public class MyHandler implements SequenceReportingEventHandler<Data> {
private final Buffer buffer = new Buffer();
private Sequence sequence;

@Override // The Sequence is passed in by the batch event processor
public void setSequenceCallback(Sequence sequence) {
this.sequence = sequence;
}

public void onEvent(Data data, long sequence, boolean endOfBatch) {
this.sequence.set(sequence);
}
}

https://groups.google.com/forum/#!searchin/lmax-disruptor/batch$20processor|sort:date/lmax-disruptor/_2m1WKrcXUo/cMTSMq6SAwAJ