您如何测试使用 JUnit 触发异步进程的方法?
我不知道如何让我的测试等待进程结束(它不完全是一个单元测试,它更像是一个集成测试,因为它涉及多个类而不仅仅是一个)。
TL;博士;不幸的是,还没有内置的解决方案(在撰写本文时,2022 年),因此您可以自由使用和/或实施任何适合您情况的解决方案。
例子
另一种方法是使用 CountDownLatch 类。
public class DatabaseTest {
/**
* Data limit
*/
private static final int DATA_LIMIT = 5;
/**
* Countdown latch
*/
private CountDownLatch lock = new CountDownLatch(1);
/**
* Received data
*/
private List<Data> receiveddata;
@Test
public void testDataRetrieval() throws Exception {
Database db = new MockDatabaseImpl();
db.getData(DATA_LIMIT, new DataCallback() {
@Override
public void onSuccess(List<Data> data) {
receiveddata = data;
lock.countDown();
}
});
lock.await(2000, TimeUnit.MILLISECONDS);
assertNotNull(receiveddata);
assertEquals(DATA_LIMIT, receiveddata.size());
}
}
注意您不能只将 syncronized 与常规对象一起用作锁,因为快速回调可以在调用锁的等待方法之前释放锁。请参阅 Joe Walnes 的this博文。
编辑由于@jtahlborn 和@Ring 的评论,删除了CountDownLatch 周围的同步块
您可以尝试使用 Awaitility 库。它使测试您正在谈论的系统变得容易。
CountDownLatch
(参见@Martin 的回答)更好。
如果您使用 CompletableFuture(Java 8 中引入)或 SettableFuture(来自 Google Guava),您可以在测试完成后立即完成,而不是等待预设的时间。您的测试将如下所示:
CompletableFuture<String> future = new CompletableFuture<>();
executorService.submit(new Runnable() {
@Override
public void run() {
future.complete("Hello World!");
}
});
assertEquals("Hello World!", future.get());
请注意,有一个库为 pre Java-8 提供 CompletableFuture,它甚至使用相同的名称(并提供所有相关的 Java-8 类),例如:net.sourceforge.streamsupport:streamsupport-minifuture:1.7.4 这很有用对于 Android 开发,即使我们使用 JDK-v11 构建,我们也希望保持代码与 Android-7 之前的设备兼容。
恕我直言,让单元测试创建或等待线程等是不好的做法。您希望这些测试在几秒钟内运行。这就是为什么我想提出一种两步法来测试异步进程。
测试您的异步进程是否正确提交。您可以模拟接受异步请求的对象,并确保提交的作业具有正确的属性等。测试您的异步回调是否在做正确的事情。在这里,您可以模拟最初提交的作业并假设它已正确初始化并验证您的回调是否正确。
我发现对测试异步方法非常有用的一种方法是在 object-to-test 的构造函数中注入一个 Executor
实例。在生产中,执行器实例被配置为异步运行,而在测试中它可以被模拟为同步运行。
所以假设我正在尝试测试异步方法 Foo#doAsync(Callback c)
,
class Foo {
private final Executor executor;
public Foo(Executor executor) {
this.executor = executor;
}
public void doAsync(Callback c) {
executor.execute(new Runnable() {
@Override public void run() {
// Do stuff here
c.onComplete(data);
}
});
}
}
在生产中,我将使用 Executors.newSingleThreadExecutor()
Executor 实例构造 Foo
,而在测试中我可能会使用执行以下操作的同步执行器来构造它 -
class SynchronousExecutor implements Executor {
@Override public void execute(Runnable r) {
r.run();
}
}
现在我对异步方法的 JUnit 测试非常干净——
@Test public void testDoAsync() {
Executor executor = new SynchronousExecutor();
Foo objectToTest = new Foo(executor);
Callback callback = mock(Callback.class);
objectToTest.doAsync(callback);
// Verify that Callback#onComplete was called using Mockito.
verify(callback).onComplete(any(Data.class));
// Assert that we got back the data that we expected.
assertEquals(expectedData, callback.getData());
}
WebClient
)的东西,则不起作用
测试线程/异步代码本质上没有任何问题,特别是如果线程是您正在测试的代码的重点。测试这些东西的一般方法是:
阻塞主测试线程
从其他线程捕获失败的断言
解锁主测试线程
重新抛出任何失败
但对于一项测试来说,这是很多样板文件。更好/更简单的方法是只使用 ConcurrentUnit:
final Waiter waiter = new Waiter();
new Thread(() -> {
doSomeWork();
waiter.assertTrue(true);
waiter.resume();
}).start();
// Wait for resume() to be called
waiter.await(1000);
与 CountdownLatch
方法相比,它的好处是它不那么冗长,因为任何线程中发生的断言失败都会正确地报告给主线程,这意味着测试应该失败。将 CountdownLatch
方法与 ConcurrentUnit 进行比较的文章是 here。
我还为那些想要了解更多细节的人写了一篇关于该主题的blog post。
如何按照 here 的描述调用 SomeObject.wait
和 notifyAll
或使用 Robotiums Solo.waitForCondition(...)
方法或使用 class i wrote 来执行此操作(有关如何使用,请参阅注释和测试类)
我找到了一个库 socket.io 来测试异步逻辑。使用 LinkedBlockingQueue 看起来很简单。这是example:
@Test(timeout = TIMEOUT)
public void message() throws URISyntaxException, InterruptedException {
final BlockingQueue<Object> values = new LinkedBlockingQueue<Object>();
socket = client();
socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() {
@Override
public void call(Object... objects) {
socket.send("foo", "bar");
}
}).on(Socket.EVENT_MESSAGE, new Emitter.Listener() {
@Override
public void call(Object... args) {
values.offer(args);
}
});
socket.connect();
assertThat((Object[])values.take(), is(new Object[] {"hello client"}));
assertThat((Object[])values.take(), is(new Object[] {"foo", "bar"}));
socket.disconnect();
}
使用 LinkedBlockingQueue 以 API 阻塞直到获得结果,就像同步方式一样。并设置超时以避免假设等待结果的时间过多。
JUnit 5 有 Assertions.assertTimeout(Duration, Executable)
/assertTimeoutPreemptively()
(请阅读各自的 Javadoc 以了解区别),而 Mockito 有 verify(mock, timeout(millisecs).times(x))
。
Assertions.assertTimeout(Duration.ofMillis(1000), () ->
myReactiveService.doSth().subscribe()
);
和:
Mockito.verify(myReactiveService,
timeout(1000).times(0)).doSth(); // cannot use never() here
超时在管道中可能是不确定的/脆弱的。所以要小心。
值得一提的是,Concurrency in Practice 中有非常有用的第 Testing Concurrent Programs
章描述了一些单元测试方法并给出了问题的解决方案。
如果测试结果是异步产生的,这就是我现在使用的。
public class TestUtil {
public static <R> R await(Consumer<CompletableFuture<R>> completer) {
return await(20, TimeUnit.SECONDS, completer);
}
public static <R> R await(int time, TimeUnit unit, Consumer<CompletableFuture<R>> completer) {
CompletableFuture<R> f = new CompletableFuture<>();
completer.accept(f);
try {
return f.get(time, unit);
} catch (InterruptedException | TimeoutException e) {
throw new RuntimeException("Future timed out", e);
} catch (ExecutionException e) {
throw new RuntimeException("Future failed", e.getCause());
}
}
}
使用静态导入,测试读起来还不错。 (注意,在这个例子中我开始一个线程来说明这个想法)
@Test
public void testAsync() {
String result = await(f -> {
new Thread(() -> f.complete("My Result")).start();
});
assertEquals("My Result", result);
}
如果未调用 f.complete
,则测试将在超时后失败。您也可以使用 f.completeExceptionally
提前失败。
这里有很多答案,但一个简单的答案是创建一个完整的 CompletableFuture 并使用它:
CompletableFuture.completedFuture("donzo")
所以在我的测试中:
this.exactly(2).of(mockEventHubClientWrapper).sendASync(with(any(LinkedList.class)));
this.will(returnValue(new CompletableFuture<>().completedFuture("donzo")));
我只是确保无论如何都会调用所有这些东西。如果您使用以下代码,则此技术有效:
CompletableFuture.allOf(calls.toArray(new CompletableFuture[0])).join();
当所有 CompletableFutures 完成时,它将直接穿过它!
尽可能避免使用并行线程进行测试(大多数情况下)。这只会使您的测试变得不稳定(有时通过,有时失败)。
只有当您需要调用其他库/系统时,您可能需要等待其他线程,在这种情况下,请始终使用 Awaitility 库而不是 Thread.sleep()
。
切勿在测试中只调用 get()
或 join()
,否则您的测试可能会在 CI 服务器上永远运行,以防将来永远无法完成。在调用 get()
之前,始终在测试中首先声明 isDone()
。对于 CompletionStage,即 .toCompletableFuture().isDone()
。
当您测试这样的非阻塞方法时:
public static CompletionStage<String> createGreeting(CompletableFuture<String> future) {
return future.thenApply(result -> "Hello " + result);
}
那么你不应该只通过在测试中传递一个完整的 Future 来测试结果,你还应该确保你的方法 doSomething()
不会通过调用 join()
或 get()
阻塞。如果您使用非阻塞框架,这一点尤其重要。
为此,请使用您设置为手动完成的未完成未来进行测试:
@Test
public void testDoSomething() throws Exception {
CompletableFuture<String> innerFuture = new CompletableFuture<>();
CompletableFuture<String> futureResult = createGreeting(innerFuture).toCompletableFuture();
assertFalse(futureResult.isDone());
// this triggers the future to complete
innerFuture.complete("world");
assertTrue(futureResult.isDone());
// futher asserts about fooResult here
assertEquals(futureResult.get(), "Hello world");
}
这样,如果将 future.join()
添加到 doSomething(),测试将失败。
如果您的服务使用 thenApplyAsync(..., executorService)
中的 ExecutorService,那么在您的测试中注入一个单线程 ExecutorService,例如来自 guava 的那个:
ExecutorService executorService = Executors.newSingleThreadExecutor();
如果您的代码使用 thenApplyAsync(...)
之类的 forkJoinPool,请重写代码以使用 ExecutorService(有很多好的理由),或者使用 Awaitility。
为了缩短示例,我在测试中将 BarService 设置为实现为 Java8 lambda 的方法参数,通常它是您将模拟的注入引用。
对于那里的所有 Spring 用户,这就是我现在通常进行集成测试的方式,其中涉及异步行为:
当异步任务(例如 I/O 调用)完成时,在生产代码中触发应用程序事件。大多数情况下,无论如何处理生产中异步操作的响应都需要此事件。
有了这个事件,您就可以在测试用例中使用以下策略:
执行被测系统 监听事件并确保事件已触发 做你的断言
要打破这一点,您首先需要触发某种域事件。我在这里使用 UUID 来识别已完成的任务,但你当然可以自由地使用其他东西,只要它是唯一的。
(请注意,以下代码片段还使用 Lombok 注释来去除样板代码)
@RequiredArgsConstructor
class TaskCompletedEvent() {
private final UUID taskId;
// add more fields containing the result of the task if required
}
生产代码本身通常如下所示:
@Component
@RequiredArgsConstructor
class Production {
private final ApplicationEventPublisher eventPublisher;
void doSomeTask(UUID taskId) {
// do something like calling a REST endpoint asynchronously
eventPublisher.publishEvent(new TaskCompletedEvent(taskId));
}
}
然后我可以使用 Spring @EventListener
在测试代码中捕获已发布的事件。事件监听器涉及更多一点,因为它必须以线程安全的方式处理两种情况:
生产代码比测试用例快并且在测试用例检查事件之前事件已经触发,或者测试用例比生产代码快并且测试用例必须等待事件。
如此处其他答案所述,CountDownLatch
用于第二种情况。另请注意,事件处理程序方法上的 @Order
注释确保在生产中使用的任何其他事件侦听器之后调用此事件处理程序方法。
@Component
class TaskCompletionEventListener {
private Map<UUID, CountDownLatch> waitLatches = new ConcurrentHashMap<>();
private List<UUID> eventsReceived = new ArrayList<>();
void waitForCompletion(UUID taskId) {
synchronized (this) {
if (eventAlreadyReceived(taskId)) {
return;
}
checkNobodyIsWaiting(taskId);
createLatch(taskId);
}
waitForEvent(taskId);
}
private void checkNobodyIsWaiting(UUID taskId) {
if (waitLatches.containsKey(taskId)) {
throw new IllegalArgumentException("Only one waiting test per task ID supported, but another test is already waiting for " + taskId + " to complete.");
}
}
private boolean eventAlreadyReceived(UUID taskId) {
return eventsReceived.remove(taskId);
}
private void createLatch(UUID taskId) {
waitLatches.put(taskId, new CountDownLatch(1));
}
@SneakyThrows
private void waitForEvent(UUID taskId) {
var latch = waitLatches.get(taskId);
latch.await();
}
@EventListener
@Order
void eventReceived(TaskCompletedEvent event) {
var taskId = event.getTaskId();
synchronized (this) {
if (isSomebodyWaiting(taskId)) {
notifyWaitingTest(taskId);
} else {
eventsReceived.add(taskId);
}
}
}
private boolean isSomebodyWaiting(UUID taskId) {
return waitLatches.containsKey(taskId);
}
private void notifyWaitingTest(UUID taskId) {
var latch = waitLatches.remove(taskId);
latch.countDown();
}
}
最后一步是在测试用例中执行被测系统。我在这里使用带有 JUnit 5 的 SpringBoot 测试,但这对于使用 Spring 上下文的所有测试应该是一样的。
@SpringBootTest
class ProductionIntegrationTest {
@Autowired
private Production sut;
@Autowired
private TaskCompletionEventListener listener;
@Test
void thatTaskCompletesSuccessfully() {
var taskId = UUID.randomUUID();
sut.doSomeTask(taskId);
listener.waitForCompletion(taskId);
// do some assertions like looking into the DB if value was stored successfully
}
}
请注意,与此处的其他答案相比,如果您并行执行测试并且多个线程同时执行异步代码,则此解决方案也将起作用。
如果您想测试逻辑,请不要异步测试它。
例如,测试这个适用于异步方法结果的代码。
public class Example {
private Dependency dependency;
public Example(Dependency dependency) {
this.dependency = dependency;
}
public CompletableFuture<String> someAsyncMethod(){
return dependency.asyncMethod()
.handle((r,ex) -> {
if(ex != null) {
return "got exception";
} else {
return r.toString();
}
});
}
}
public class Dependency {
public CompletableFuture<Integer> asyncMethod() {
// do some async stuff
}
}
在测试中模拟具有同步实现的依赖项。单元测试是完全同步的,运行时间为 150 毫秒。
public class DependencyTest {
private Example sut;
private Dependency dependency;
public void setup() {
dependency = Mockito.mock(Dependency.class);;
sut = new Example(dependency);
}
@Test public void success() throws InterruptedException, ExecutionException {
when(dependency.asyncMethod()).thenReturn(CompletableFuture.completedFuture(5));
// When
CompletableFuture<String> result = sut.someAsyncMethod();
// Then
assertThat(result.isCompletedExceptionally(), is(equalTo(false)));
String value = result.get();
assertThat(value, is(equalTo("5")));
}
@Test public void failed() throws InterruptedException, ExecutionException {
// Given
CompletableFuture<Integer> c = new CompletableFuture<Integer>();
c.completeExceptionally(new RuntimeException("failed"));
when(dependency.asyncMethod()).thenReturn(c);
// When
CompletableFuture<String> result = sut.someAsyncMethod();
// Then
assertThat(result.isCompletedExceptionally(), is(equalTo(false)));
String value = result.get();
assertThat(value, is(equalTo("got exception")));
}
}
您不测试异步行为,但您可以测试逻辑是否正确。
我更喜欢使用等待和通知。它简单明了。
@Test
public void test() throws Throwable {
final boolean[] asyncExecuted = {false};
final Throwable[] asyncThrowable= {null};
// do anything async
new Thread(new Runnable() {
@Override
public void run() {
try {
// Put your test here.
fail();
}
// lets inform the test thread that there is an error.
catch (Throwable throwable){
asyncThrowable[0] = throwable;
}
// ensure to release asyncExecuted in case of error.
finally {
synchronized (asyncExecuted){
asyncExecuted[0] = true;
asyncExecuted.notify();
}
}
}
}).start();
// Waiting for the test is complete
synchronized (asyncExecuted){
while(!asyncExecuted[0]){
asyncExecuted.wait();
}
}
// get any async error, including exceptions and assertationErrors
if(asyncThrowable[0] != null){
throw asyncThrowable[0];
}
}
基本上,我们需要创建一个最终的 Array 引用,以便在匿名内部类中使用。我宁愿创建一个 boolean[],因为我可以设置一个值来控制我们是否需要 wait()。当一切都完成后,我们只需释放 asyncExecuted。
假设您有以下代码:
public void method() {
CompletableFuture.runAsync(() -> {
//logic
//logic
//logic
//logic
});
}
尝试将其重构为如下内容:
public void refactoredMethod() {
CompletableFuture.runAsync(this::subMethod);
}
private void subMethod() {
//logic
//logic
//logic
//logic
}
之后,以这种方式测试 subMethod:
org.powermock.reflect.Whitebox.invokeMethod(classInstance, "subMethod");
这不是一个完美的解决方案,但它会测试异步执行中的所有逻辑。
subMethod
提取到另一个类,则此解决方案会更好,因此可以在没有 powermock/reflection 的情况下进行测试
await()
调用中删除超时。使用CountDownLatch
应该保证您将“尽快”获得结果,而无需渲染它。