88 lines
2.5 KiB
Kotlin
88 lines
2.5 KiB
Kotlin
package de.timklge.karooheadwind
|
|
|
|
import io.hammerhead.karooext.KarooSystemService
|
|
import io.hammerhead.karooext.models.ActiveRidePage
|
|
import io.hammerhead.karooext.models.OnLocationChanged
|
|
import io.hammerhead.karooext.models.OnNavigationState
|
|
import io.hammerhead.karooext.models.OnStreamState
|
|
import io.hammerhead.karooext.models.RideState
|
|
import io.hammerhead.karooext.models.StreamState
|
|
import kotlinx.coroutines.channels.awaitClose
|
|
import kotlinx.coroutines.channels.trySendBlocking
|
|
import kotlinx.coroutines.delay
|
|
import kotlinx.coroutines.flow.Flow
|
|
import kotlinx.coroutines.flow.callbackFlow
|
|
import kotlinx.coroutines.flow.conflate
|
|
import kotlinx.coroutines.flow.map
|
|
import kotlinx.coroutines.flow.sample
|
|
import kotlinx.coroutines.flow.transform
|
|
|
|
fun KarooSystemService.streamDataFlow(dataTypeId: String): Flow<StreamState> {
|
|
return callbackFlow {
|
|
val listenerId = addConsumer(OnStreamState.StartStreaming(dataTypeId)) { event: OnStreamState ->
|
|
trySendBlocking(event.state)
|
|
}
|
|
awaitClose {
|
|
removeConsumer(listenerId)
|
|
}
|
|
}
|
|
}
|
|
|
|
fun KarooSystemService.streamLocation(): Flow<OnLocationChanged> {
|
|
return callbackFlow {
|
|
val listenerId = addConsumer { event: OnLocationChanged ->
|
|
trySendBlocking(event)
|
|
}
|
|
awaitClose {
|
|
removeConsumer(listenerId)
|
|
}
|
|
}
|
|
}
|
|
|
|
fun KarooSystemService.streamNavigationState(): Flow<OnNavigationState> {
|
|
return callbackFlow {
|
|
val listenerId = addConsumer { event: OnNavigationState ->
|
|
trySendBlocking(event)
|
|
}
|
|
awaitClose {
|
|
removeConsumer(listenerId)
|
|
}
|
|
}
|
|
}
|
|
|
|
fun KarooSystemService.streamRideState(): Flow<RideState> {
|
|
return callbackFlow {
|
|
val listenerId = addConsumer { event: RideState ->
|
|
trySendBlocking(event)
|
|
}
|
|
awaitClose {
|
|
removeConsumer(listenerId)
|
|
}
|
|
}
|
|
}
|
|
|
|
fun<T> Flow<T>.throttle(timeout: Long): Flow<T> = this
|
|
.conflate()
|
|
.transform {
|
|
emit(it)
|
|
delay(timeout)
|
|
}
|
|
|
|
fun KarooSystemService.streamActiveRidePage(): Flow<ActiveRidePage> {
|
|
return callbackFlow {
|
|
val listenerId = addConsumer { activeRidePage: ActiveRidePage ->
|
|
trySendBlocking(activeRidePage)
|
|
}
|
|
awaitClose {
|
|
removeConsumer(listenerId)
|
|
}
|
|
}
|
|
}
|
|
|
|
fun KarooSystemService.streamDatatypeIsVisible(
|
|
datatype: String,
|
|
): Flow<Boolean> {
|
|
return streamActiveRidePage().map { page ->
|
|
page.page.elements.any { it.dataTypeId == datatype }
|
|
}
|
|
} |