Service monitoring with python asyncio
In this post, I will give an example of using python asyncio package to do monitoring. The asyncio APIs are slightly different between python version 3.6 and 3.7. Here I use version 3.7 since it requires less boilerplate. All source code can be found in this repo.
The starting point is a serial implementation of monitor. For simplicity, I also assume the health checks are so fast that timing drift is not a concern.
tic = time.monotonic()
def check_health(i: int, predicate: Callable) -> bool:
"""
Return True if something goes wrong
"""
print(f'check health {i} @{time.monotonic() - tic:0.1f}')
return predicate()
def main():
any_problem = lambda: False
while True:
if check_health(1, any_problem):
break
if check_health(2, any_problem):
break
if check_health(3, any_problem):
break
time.sleep(3)
do_cleanup()
Here the main
function contains an event loop
which stops if any health check fails.
The dummy function any_problem
keeps the loop alive forever.
In real situations, different health checks will be applied.
Running this code, we see that the health checks run together periodically, as expected.
chronos (master *+) async-monitor $ python3 serial.py
check health 1 @0.0
check health 2 @0.0
check health 3 @0.0
check health 1 @3.0
check health 2 @3.0
check health 3 @3.0
check health 1 @6.0
check health 2 @6.0
check health 3 @6.0
...
More realistically, we will do different health checks at different intervals. This simple requirement actually has 3 components
- health checks should run periodically
- they don’t block each other
- one failing check can (optionally) stop other checks
The first component is basically an event loop, as in the serial code. The second component can be done in various ways. An obvious choice is to run different checks in different threads (which I won’t show here). The third component requires communication either between different health checks or between the health checks and the main event loop. Again, there are various ways to do it. For example, we can use Event or Condition for the coordination.
Before going to the python asyncio implementation, let’s first look at a golang implementation. This code snippet shares similar overall structure with the later python asyncio code, and reveals some implementation details hidden in the python APIs.
var tic = time.Now()
func runAtInterval(dt time.Duration, checker func() bool, stop chan<- struct{}) {
ticker := time.NewTicker(dt)
defer ticker.Stop()
for {
select {
case <-ticker.C:
{
fmt.Printf("check health %s @%.3s\n", dt, time.Now().Sub(tic))
bad := checker()
if bad {
stop <- struct{}{}
}
}
}
}
}
func main() {
ch1 := make(chan struct{})
ch2 := make(chan struct{})
ch3 := make(chan struct{})
anyProblem := func() bool { return false }
doomSoon := func() bool { return true }
go runAtInterval(time.Second*3, anyProblem, ch1)
go runAtInterval(time.Second*5, anyProblem, ch2)
go runAtInterval(time.Second*16, doomSoon, ch3)
select {
case <-ch1:
fmt.Println("Problem detected on first checker")
case <-ch2:
fmt.Println("Problem detected on second checker")
case <-ch3:
fmt.Println("Problem detected on third checker")
}
doCleanup()
}
Here the runAtInterval
function runs a ticker to do the health check periodically (the first component).
Three of they run concurrently in separate goroutines (the second component).
When a problem is detected, a stop signal is sent back to the main
goroutine via a channel.
At that point, the select
statement unblocks and cleanup takes place.
The unbuffered channels together with the select
statement in golang make it very easy to reason about the order of events.
This code actually has a potential problem in the third component.
Suppose doCleanup
takes some time to complete.
While it’s doing the work, the other health checks are still running,
which may or may not be desirable.
If terminating pending health checks is preferred, we can pass another channel to runAtInterval
to signal the cancellation - a second case
for the select
statement.
Since the focus of this post is not on golang, I won’t elaborate on further variations.
Running this code, we get the interleaving health checks and the mock problem at 16 seconds.
chronos (master *) async-monitor $ go run monitor.go
check health 3s @3.0
check health 5s @5.0
check health 3s @6.0
check health 3s @9.0
check health 5s @10.
check health 3s @12.
check health 5s @15.
check health 3s @15.
check health 16s @16.
Problem detected on third checker
This overall code structure can be mapped to the python asyncio version almost exactly.
tic = time.monotonic()
async def run_at_interval(t: float, predicate: Callable):
while True:
await asyncio.sleep(t)
print(f'check health {t}: @{time.monotonic() - tic:0.1f}')
feedback = predicate()
if feedback:
return feedback
async def main():
any_problem = lambda: False
t1 = asyncio.create_task(run_at_interval(3, any_problem))
t2 = asyncio.create_task(run_at_interval(5, any_problem))
timeout = asyncio.create_task(asyncio.sleep(16))
done, pending = await asyncio.wait({t1, t2, timeout},
return_when=asyncio.FIRST_COMPLETED)
for t in pending:
t.cancel()
feedback = await done.pop()
return feedback
if __name__ == '__main__':
feedback = asyncio.run(main())
do_cleanup(feedback)
Here the function run_at_interval
loops forever until some health check fails.
In the main
function, we create 2 health checks with different periods, as
well as a timeout
event.
The wait
function plays the same role as the select
statement in golang.
When the first problem is detected, we further cancel
the other health checks.
Here I also use a feedback
object to facilitate the cleanup process.
If you are new to async/await
, this code may look quite different from any “regular” python code.
But in fact, most of it are just boilerplate, e.g., creating tasks, wait
ing for tasks.
My quick and dirty way to understand async/await
is as follows:
async
labels the function to be used as coroutine- The thing after
await
should be a coroutine that is not CPU demanding, for example, IO bound operations, orsleep()
as in this example. await
is likeyield from
. On one hand, it labels the points where a function could give back control to its caller. On the other hand, it takes care of retrieving data from layers of coroutines.- Using
await
keyword does not indicate concurrency, just like using coroutine does not indicate concurrency. - By creating
Task
s out of coroutines, we can run them concurrently. In other words,Task
does not block. As a result, they may not finish at any point of time. And there are APIs to query and manipulate its internal states.
If you are not familiar with coroutine,
search “David Beazley” on Youtube or google.
At high level, it is a mechanism to make functions run half way and give back control to their callers. In python, it is implemented with the keyword yield
and functions such as next()
, send()
, and close()
.
Running this code, we get the same result as the go code
chronos (master *+) async-monitor $ python3 async.py
check health 3: @3.0
check health 5: @5.0
check health 3: @6.0
check health 3: @9.0
check health 5: @10.0
check health 3: @12.0
check health 5: @15.0
check health 3: @15.0
In case we do not cancel the pending tasks in main
,
this code still run but there will be complaints.
Task was destroyed but it is pending!
Overall, the python asyncio version is slightly easier to write than the go version if the cancellation logic is implemented.
The asyncio.wait
function takes care of most of the business logic for us.
Besides FIRST_COMPLETED
, it also supports
FIRST_EXCEPTION
and ALL_COMPLETED
.