Store element in current coroutine context
Hello !
I want to store a temporary redis connection in the current coroutine context to use it later / avoid multiple new connections.
Previously, I had this code :
And my code works.
I changed to store the current connection like :
And I have a new error:
(This message in thread)
How can I use the new current coroutine context with the flow?
public suspend inline fun <T> connect(crossinline body: suspend (RedisCoroutinesCommands<ByteArray, ByteArray>) -> T): T {
contract {
callsInPlace(body, InvocationKind.EXACTLY_ONCE)
}
return connectionManager.poolStateful.acquire {
val newConnection = it.coroutines()
body(newConnection)
}
}
public suspend inline fun <T> connect(crossinline body: suspend (RedisCoroutinesCommands<ByteArray, ByteArray>) -> T): T {
contract {
callsInPlace(body, InvocationKind.EXACTLY_ONCE)
}
return connectionManager.poolStateful.acquire {
val newConnection = it.coroutines()
body(newConnection)
}
}
public suspend inline fun <T> connect(crossinline body: suspend (RedisCoroutinesCommands<ByteArray, ByteArray>) -> T): T {
contract {
callsInPlace(body, InvocationKind.EXACTLY_ONCE)
}
val currentContext = currentCoroutineContext()
val connection = currentContext[RedisConnection]
if (connection != null) return body(connection.connection)
return connectionManager.poolStateful.acquire {
val newConnection = it.coroutines()
withContext(currentContext + RedisConnection(newConnection)) {
body(newConnection)
}
}
}
public suspend inline fun <T> connect(crossinline body: suspend (RedisCoroutinesCommands<ByteArray, ByteArray>) -> T): T {
contract {
callsInPlace(body, InvocationKind.EXACTLY_ONCE)
}
val currentContext = currentCoroutineContext()
val connection = currentContext[RedisConnection]
if (connection != null) return body(connection.connection)
return connectionManager.poolStateful.acquire {
val newConnection = it.coroutines()
withContext(currentContext + RedisConnection(newConnection)) {
body(newConnection)
}
}
}
1 Reply
Error :
Flow invariant is violated:
Flow was collected in [RunningInRunTest, kotlinx.coroutines.test.TestCoroutineScheduler@458031da, kotlinx.coroutines.test.TestScopeKt$TestScope$$inlined$CoroutineExceptionHandler$1@7be94cd6, CoroutineId(9), "coroutine#9":StandaloneCoroutine{Active}@20cff21e, StandardTestDispatcher[scheduler=kotlinx.coroutines.test.TestCoroutineScheduler@458031da]],
but emission happened in [RunningInRunTest, kotlinx.coroutines.test.TestCoroutineScheduler@458031da, kotlinx.coroutines.test.TestScopeKt$TestScope$$inlined$CoroutineExceptionHandler$1@7be94cd6, CoroutineId(9), com.github.rushyverse.core.cache.RedisConnectionImpl@463045fb, kotlinx.coroutines.UndispatchedMarker@27ab206, "coroutine#9":UndispatchedCoroutine{Active}@3344d163, StandardTestDispatcher[scheduler=kotlinx.coroutines.test.TestCoroutineScheduler@458031da]].
Please refer to 'flow' documentation or use 'flowOn' instead
Flow invariant is violated:
Flow was collected in [RunningInRunTest, kotlinx.coroutines.test.TestCoroutineScheduler@458031da, kotlinx.coroutines.test.TestScopeKt$TestScope$$inlined$CoroutineExceptionHandler$1@7be94cd6, CoroutineId(9), "coroutine#9":StandaloneCoroutine{Active}@20cff21e, StandardTestDispatcher[scheduler=kotlinx.coroutines.test.TestCoroutineScheduler@458031da]],
but emission happened in [RunningInRunTest, kotlinx.coroutines.test.TestCoroutineScheduler@458031da, kotlinx.coroutines.test.TestScopeKt$TestScope$$inlined$CoroutineExceptionHandler$1@7be94cd6, CoroutineId(9), com.github.rushyverse.core.cache.RedisConnectionImpl@463045fb, kotlinx.coroutines.UndispatchedMarker@27ab206, "coroutine#9":UndispatchedCoroutine{Active}@3344d163, StandardTestDispatcher[scheduler=kotlinx.coroutines.test.TestCoroutineScheduler@458031da]].
Please refer to 'flow' documentation or use 'flowOn' instead