Expose the JVB stress level as a stat (#1440)

* support division in JvbLoadMeasurement

* expose the current stress level in load manager

* always create load manager/sampler, only enable/disable reducer

* update jmc

* change semantics of load management enabled config to be reducer enabled, add tests

* expose stress level in stats
pull/1445/head
bbaldino 5 years ago committed by GitHub
parent aaec846f42
commit 8a0045bbc3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 56
      jvb/src/main/java/org/jitsi/videobridge/Videobridge.java
  2. 4
      jvb/src/main/java/org/jitsi/videobridge/stats/VideobridgeStatistics.java
  3. 61
      jvb/src/main/kotlin/org/jitsi/videobridge/load_management/JvbLoadManager.kt
  4. 2
      jvb/src/main/kotlin/org/jitsi/videobridge/load_management/JvbLoadMeasurement.kt
  5. 4
      jvb/src/main/kotlin/org/jitsi/videobridge/load_management/PacketRateLoadSampler.kt
  6. 7
      jvb/src/main/kotlin/org/jitsi/videobridge/load_management/PacketRateMeasurement.kt
  7. 3
      jvb/src/main/resources/reference.conf
  8. 54
      jvb/src/test/kotlin/org/jitsi/videobridge/load_management/JvbLoadManagerTest.kt
  9. 38
      jvb/src/test/kotlin/org/jitsi/videobridge/load_management/PacketRateMeasurementTest.kt
  10. 2
      pom.xml

@ -148,29 +148,29 @@ public class Videobridge
public Videobridge()
{
videobridgeExpireThread = new VideobridgeExpireThread(this);
if (JvbLoadManager.isEnabled())
{
logger.info("Starting JVB load management task");
jvbLoadManager = new JvbLoadManager<>(
PacketRateMeasurement.getLoadedThreshold(),
PacketRateMeasurement.getRecoveryThreshold(),
new LastNReducer(
this::getConferences,
JvbLastNKt.jvbLastNSingleton
)
);
loadSamplerTask = TaskPools.SCHEDULED_POOL.scheduleAtFixedRate(
new PacketRateLoadSampler(this, jvbLoadManager),
0,
10,
TimeUnit.SECONDS
);
}
else
{
jvbLoadManager = null;
loadSamplerTask = null;
}
jvbLoadManager = new JvbLoadManager<>(
PacketRateMeasurement.getLoadedThreshold(),
PacketRateMeasurement.getRecoveryThreshold(),
new LastNReducer(
this::getConferences,
JvbLastNKt.jvbLastNSingleton
)
);
loadSamplerTask = TaskPools.SCHEDULED_POOL.scheduleAtFixedRate(
new PacketRateLoadSampler(
this,
(loadMeasurement) -> {
// Update the load manager with the latest measurement
jvbLoadManager.loadUpdate(loadMeasurement);
// Update the stats with the latest stress level
getStatistics().stressLevel = jvbLoadManager.getCurrentStressLevel();
return Unit.INSTANCE;
}
),
0,
10,
TimeUnit.SECONDS
);
}
/**
@ -569,10 +569,7 @@ public class Videobridge
debugState.put("time", System.currentTimeMillis());
debugState.put("health", getHealthStatus());
if (jvbLoadManager != null)
{
debugState.put("load-management", jvbLoadManager.getStats());
}
debugState.put("load-management", jvbLoadManager.getStats());
debugState.put(Endpoint.overallAverageBridgeJitter.name, Endpoint.overallAverageBridgeJitter.get());
JSONObject conferences = new JSONObject();
@ -804,5 +801,10 @@ public class Videobridge
* wasn't (at the time of expiration).
*/
public AtomicInteger dtlsFailedEndpoints = new AtomicInteger();
/**
* The stress level for this bridge
*/
public Double stressLevel = 0.0;
}
}

@ -435,6 +435,10 @@ public class VideobridgeStatistics
EPS_NO_MSG_TRANSPORT_AFTER_DELAY,
jvbStats.numEndpointsNoMessageTransportAfterDelay.get()
);
unlockedSetStat(
"stress_level",
jvbStats.stressLevel
);
unlockedSetStat(CONFERENCES, conferences);
unlockedSetStat(OCTO_CONFERENCES, octoConferences);
unlockedSetStat(INACTIVE_CONFERENCES, inactiveConferences);

@ -36,46 +36,62 @@ class JvbLoadManager<T : JvbLoadMeasurement> @JvmOverloads constructor(
) {
private val logger = createLogger(minLogLevel = Level.ALL)
val reducerEnabled: Boolean by config("videobridge.load-management.reducer-enabled".from(JitsiConfig.newConfig))
private var lastReducerTime: Instant = NEVER
private var state: State = State.NOT_OVERLOADED
private var mostRecentLoadMeasurement: T? = null
fun loadUpdate(loadMeasurement: T) {
logger.cdebug { "Got a load measurement of $loadMeasurement" }
mostRecentLoadMeasurement = loadMeasurement
val now = clock.instant()
if (loadMeasurement.getLoad() >= jvbLoadThreshold.getLoad()) {
state = State.OVERLOADED
logger.info("Load measurement $loadMeasurement is above threshold of $jvbLoadThreshold")
if (canRunReducer(now)) {
logger.info("Running load reducer")
loadReducer.reduceLoad()
lastReducerTime = now
} else {
logger.info("Load reducer ran at $lastReducerTime, which is within ${loadReducer.impactTime()} " +
"of now, not running reduce")
if (reducerEnabled) {
logger.info("Load measurement $loadMeasurement is above threshold of $jvbLoadThreshold")
if (canRunReducer(now)) {
logger.info("Running load reducer")
loadReducer.reduceLoad()
lastReducerTime = now
} else {
logger.info("Load reducer ran at $lastReducerTime, which is within " +
"${loadReducer.impactTime()} of now, not running reduce")
}
}
} else {
state = State.NOT_OVERLOADED
if (loadMeasurement.getLoad() < jvbRecoveryThreshold.getLoad()) {
if (canRunReducer(now)) {
if (loadReducer.recover()) {
logger.info("Recovery ran after a load measurement of $loadMeasurement (which was below " +
"threshold of $jvbRecoveryThreshold) was received")
lastReducerTime = now
if (reducerEnabled) {
if (loadMeasurement.getLoad() < jvbRecoveryThreshold.getLoad()) {
if (canRunReducer(now)) {
if (loadReducer.recover()) {
logger.info("Recovery ran after a load measurement of $loadMeasurement (which was " +
"below threshold of $jvbRecoveryThreshold) was received")
lastReducerTime = now
} else {
logger.cdebug { "Recovery had no work to do" }
}
} else {
logger.cdebug { "Recovery had no work to do" }
logger.cdebug {
"Load measurement $loadMeasurement is below recovery threshold, but load reducer " +
"ran at $lastReducerTime, which is within ${loadReducer.impactTime()} of now, " +
"not running recover"
}
}
} else {
logger.cdebug { "Load measurement $loadMeasurement is below recovery threshold, but load reducer " +
"ran at $lastReducerTime, which is within ${loadReducer.impactTime()} of now, not running " +
"recover" }
}
}
}
}
fun getCurrentStressLevel(): Double =
mostRecentLoadMeasurement?.div(jvbLoadThreshold) ?: 0.0
fun getStats() = OrderedJsonObject().apply {
put("state", state.toString())
put("stress", getCurrentStressLevel().toString())
put("reducer_enabled", reducerEnabled.toString())
put("reducer", loadReducer.getStats())
}
@ -86,11 +102,4 @@ class JvbLoadManager<T : JvbLoadMeasurement> @JvmOverloads constructor(
OVERLOADED,
NOT_OVERLOADED
}
companion object {
val enabled: Boolean by config("videobridge.load-management.enabled".from(JitsiConfig.newConfig))
@JvmStatic
fun isEnabled() = enabled
}
}

@ -23,6 +23,8 @@ package org.jitsi.videobridge.load_management
interface JvbLoadMeasurement {
fun getLoad(): Double
operator fun div(other: JvbLoadMeasurement): Double
companion object {
const val CONFIG_BASE = "videobridge.load-management.load-measurements"
}

@ -20,7 +20,7 @@ import org.jitsi.videobridge.Videobridge
class PacketRateLoadSampler(
private val videobridge: Videobridge,
private val jvbLoadManager: JvbLoadManager<PacketRateMeasurement>
private val newMeasurementHandler: (PacketRateMeasurement) -> Unit
) : Runnable {
override fun run() {
@ -33,6 +33,6 @@ class PacketRateLoadSampler(
}
}
}
jvbLoadManager.loadUpdate(PacketRateMeasurement(totalPacketRate))
newMeasurementHandler(PacketRateMeasurement(totalPacketRate))
}
}

@ -22,6 +22,13 @@ import org.jitsi.metaconfig.config
class PacketRateMeasurement(private val packetRate: Long) : JvbLoadMeasurement {
override fun getLoad(): Double = packetRate.toDouble()
override fun div(other: JvbLoadMeasurement): Double {
if (other !is PacketRateMeasurement) {
throw UnsupportedOperationException("Can only divide load measurements of same type")
}
return packetRate / other.packetRate.toDouble()
}
override fun toString(): String = "RTP packet rate (up + down) of $packetRate pps"
companion object {

@ -139,7 +139,8 @@ videobridge {
send-queue-size=1024
}
load-management {
enabled = false
# Whether or not the reducer will be enabled to take actions to mitigate load
reducer-enabled = false
load-measurements {
packet-rate {
# The packet rate at which we'll consider the bridge overloaded

@ -18,10 +18,13 @@ package org.jitsi.videobridge.load_management
import io.kotest.core.spec.IsolationMode
import io.kotest.core.spec.style.ShouldSpec
import io.kotest.matchers.shouldBe
import io.mockk.Called
import io.mockk.every
import io.mockk.mockk
import io.mockk.verify
import org.jitsi.config.setNewConfig
import org.jitsi.metaconfig.MetaconfigSettings
import org.jitsi.test.time.FakeClock
import org.jitsi.utils.mins
import org.jitsi.utils.secs
@ -34,12 +37,17 @@ class JvbLoadManagerTest : ShouldSpec({
}
val clock = FakeClock()
val loadManager = JvbLoadManager<MockLoadMeasurement>(
MockLoadMeasurement(10.0),
MockLoadMeasurement(7.0),
reducer,
clock
)
val loadManager = createWithConfig("""
videobridge.load-management.reducer-enabled=true
""".trimIndent()
) {
JvbLoadManager(
MockLoadMeasurement(10.0),
MockLoadMeasurement(7.0),
reducer,
clock
)
}
context("a load update") {
context("with a load which isn't overloaded") {
@ -100,10 +108,44 @@ class JvbLoadManagerTest : ShouldSpec({
}
}
}
context("the stress level") {
should("be 0 if no measurement has been received") {
loadManager.getCurrentStressLevel() shouldBe 0.0
}
should("update with every load measurement") {
loadManager.loadUpdate(MockLoadMeasurement(1.0))
loadManager.getCurrentStressLevel() shouldBe .1
loadManager.loadUpdate(MockLoadMeasurement(3.0))
loadManager.getCurrentStressLevel() shouldBe .3
loadManager.loadUpdate(MockLoadMeasurement(11.0))
loadManager.getCurrentStressLevel() shouldBe 1.1
}
}
})
class MockLoadMeasurement(var loadMeasurement: Double) : JvbLoadMeasurement {
override fun getLoad(): Double = loadMeasurement
override fun div(other: JvbLoadMeasurement): Double {
other as MockLoadMeasurement
return loadMeasurement / other.loadMeasurement
}
override fun toString(): String = "Mock load measurement of $loadMeasurement"
}
/**
* A helper function to run the given [block] with the given [config] in place and
* return the result.
*
* This assumes that the result of [block] should be created with exactly the given
* [config] in place, and all config values will be retrieved immediately.
*/
private fun <T : Any> createWithConfig(config: String, block: () -> T): T {
setNewConfig(config, true, "name")
MetaconfigSettings.retrieveValuesImmediately = true
val result = block()
MetaconfigSettings.retrieveValuesImmediately = false
setNewConfig("", true, "name")
return result
}

@ -0,0 +1,38 @@
/*
* Copyright @ 2018 - present 8x8, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jitsi.videobridge.load_management
import io.kotest.assertions.throwables.shouldThrow
import io.kotest.core.spec.style.ShouldSpec
import io.kotest.matchers.shouldBe
class PacketRateMeasurementTest : ShouldSpec({
context("division") {
context("between two PacketRateMeasurements") {
should("yield the correct result") {
PacketRateMeasurement(10) / PacketRateMeasurement(2) shouldBe 5.0
}
}
context("between a PacketRateMeasurement and another measurement type") {
should("throw an exception") {
shouldThrow<UnsupportedOperationException> {
PacketRateMeasurement(10) / MockLoadMeasurement(1.1)
}
}
}
}
})

@ -64,7 +64,7 @@
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>jitsi-metaconfig</artifactId>
<version>30cdd3e22b</version>
<version>315a3cec35</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>

Loading…
Cancel
Save