Тема обработки большого потока сообщений или других событий возникает достаточно часто.
Еще в далеком
2005 году на SECR я
делал доклад на эту тему под названием "Дизайн распределенных высокопроизводительных систем обработки сообщений".
Одной из идей, которые я пытался донести до слушателей, было то, что для увеличения надежности система должна быть построена так,
чтобы все сообщения обрабатывались пачками. При этом, размер пачки должен увеличиваться под нагрузкой.
Если накладной расход на обработку пачки имеет некую константу на всю пачку в целом, то такой подход позволить
системе уменьшать свои накладные расходы при увеличении нагрузки. Такая система элегантно справляется
с ситуациями экстремальной нагрузки.
Один из примеров это графические приложения: накладные расходы на перерисовку экрана обычно не очень
зависят от того, сколько элементов на экране поменялось. Бывает выгодней иногда перерисовывать почти всё,
чем пытаться вычислить какие именно области экрана поменялись и действительно требуют перерисовки.
К сожалению, часто сообщения поступают на вход системы по одному, а задача их группировки в пачки ложиться
на плечи программиста. Это то место, где очень легко совершить ошибку. В
Devexperts
мы давно выработали шаблон кода для таких ситуаций, которым я и хочу здесь поделиться.
Я набросал простейшую реализацию этого шаблона в классе BatchProcessor, код которого приведен ниже.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
public abstract class BatchProcessor {
private final Executor executor;
private final List batch = new ArrayList();
private final AtomicBoolean scheduled = new AtomicBoolean();
public BatchProcessor(Executor executor) {
this.executor = executor;
}
public void processItem(T item) {
addItem(item);
submitTask();
}
protected abstract void processBatch(List batch);
private synchronized void addItem(T item) {
batch.add(item);
}
private synchronized List retrieveBatch() {
List result = new ArrayList(batch);
batch.clear();
return result;
}
private synchronized boolean hasBatch() {
return !batch.isEmpty();
}
private void submitTask() {
if (scheduled.compareAndSet(false, true))
executor.execute(new Runnable() {
public void run() {
executeTask();
}
});
}
private void executeTask() {
try {
processBatch(retrieveBatch());
} finally {
scheduled.set(false);
if (hasBatch())
submitTask();
}
}
}
При конструировании этого класса надо указать Executor, в котором будут выполняться собранные пачки событий в
строке 11. Данная реализация накапливает все сообщения в списке batch при вызове метода processItem в
строке 15, а накопленную пачку сообщения передает в защищенный метод processBatch, объявленный в
строке 19, который необходимо перекрыть в наследнике для обработки пачки cообщений. В реальности
бывает так, что сообщения можно как-нибудь сгруппировать более эффективно, например, более новые сообщения могут
полностью отменять предыдущие и делать их обработку ненужными. Для реализации подобной логики можно поменять метод
addItem в
строке 22. Обратите внимание, что порядок действий в методе executeTask в
строке 45, а также try-finally блок очень важны для корректной работы этого шаблона во всех ситуациях.
У приведенного выше кода класса BatchProcessor есть несколько очень полезных свойств, которые сложно получить более простым кодом:
- Порядок сообщений всегда сохраняется.
- Метод processBatch никогда не вызывается параллельно.
- При вызове метода processBatch никакая критическая секция, семафор или лок не заняты. Из метода processBatch можно безопасно вызывать любые методы (в т.ч. processItem) без риска попасть во взаимную блокировку.
- В случае сбоя метода processBatch обработка следующих пачек продолжится.
Пример использования класса BatchProcessor:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import java.util.*;
import java.util.concurrent.*;
public class BatchProcessorTest extends BatchProcessor {
public static void main(String[] args) throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(10);
BatchProcessorTest test = new BatchProcessorTest(executor);
for (int i = 1; i <= 100; i++) {
test.processItem(i);
if (i % 10 == 0)
Thread.sleep(10);
}
executor.shutdown();
}
private BatchProcessorTest(Executor executor) {
super(executor);
}
protected void processBatch(List batch) {
System.out.println(Thread.currentThread().getName() + ": " + batch);
}
}
Этот код отправляет целые числа от 1 до 100 в метод processItem, засыпая на 10 ms после каждого 10-го числа,
и выводит на экран получившиеся пачки в методе processBatch.
Как и ожидается, результат запуска этого кода выглядит вот так:
pool-1-thread-1: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
pool-1-thread-2: [11, 12, 13, 14, 15, 16, 17, 18, 19, 20]
pool-1-thread-3: [21, 22, 23, 24, 25, 26, 27, 28, 29, 30]
pool-1-thread-4: [31, 32, 33, 34, 35, 36, 37, 38, 39, 40]
pool-1-thread-5: [41, 42, 43, 44, 45, 46, 47, 48, 49, 50]
pool-1-thread-6: [51, 52, 53, 54, 55, 56, 57, 58, 59, 60]
pool-1-thread-7: [61, 62, 63, 64, 65, 66, 67, 68, 69, 70]
pool-1-thread-8: [71, 72, 73, 74, 75, 76, 77, 78, 79, 80]
pool-1-thread-9: [81, 82, 83, 84, 85, 86, 87, 88, 89, 90]
pool-1-thread-10: [91, 92, 93, 94, 95, 96, 97, 98, 99, 100]
То есть несмотря на то, что целые числа были добавлены по одному, они ушли на обработку пачками.
При недостатке свободных ресурсов в пуле потоков, размер пачки будет автоматически увеличиваться.
Что и требовалось получить.
UPDATE: Как улучшить дизайн этого кода, читайте
в следующей записи.