Building a Random Executor in Python — Step‑by‑Step TutorialA “Random Executor” is a component that runs tasks chosen at random from a pool rather than in a fixed or priority-driven order. This pattern can be useful for load testing, fuzzing, simulating non‑deterministic behavior, or simple scheduling where fairness and unpredictability are desirable. This tutorial walks through designing, implementing, and testing a Random Executor in Python, with practical examples and variations you can adapt to your needs.
What you’ll learn
- Core design decisions for a Random Executor
- A simple synchronous implementation
- An asynchronous (asyncio) version
- Variants: weighted random selection, exclusion windows, rate limiting
- Testing strategies and examples
- Performance considerations and caveats
1. Design considerations
Before coding, decide on the executor’s goals and constraints:
- Task model: synchronous callables, coroutines, or both?
- Selection policy: uniform random, weighted random, or biased random?
- Concurrency: single-threaded, multi-threaded, or async?
- Lifecycle: one-shot execution, continuous loop, or scheduled times?
- Failure handling: retries, backoff, or move-on?
- Observability: logging, metrics, or tracing?
For this tutorial we’ll implement:
- A simple synchronous Random Executor that picks tasks uniformly at random and runs them.
- An asyncio-based Random Executor for concurrent coroutines.
- A weighted random variant.
- Small additions: exclusion window (avoid immediate repeats), rate limiting, and retries.
2. Simple synchronous Random Executor
This implementation accepts callables (functions) and runs them in a loop, selecting tasks uniformly at random.
# random_executor_sync.py import random import time import threading from typing import Callable, List, Optional class RandomExecutor: def __init__(self, tasks: Optional[List[Callable]] = None, delay: float = 0.0): """ tasks: list of zero-argument callables (or callables where args are bound via functools.partial) delay: seconds to wait between task executions (simple pacing) """ self.tasks = tasks or [] self.delay = delay self._stop_event = threading.Event() def add_task(self, fn: Callable): self.tasks.append(fn) def remove_task(self, fn: Callable): self.tasks = [t for t in self.tasks if t is not fn] def start(self, iterations: Optional[int] = None): """ Start executing tasks. If iterations is None, run indefinitely until stop() is called. """ count = 0 while not self._stop_event.is_set(): if not self.tasks: time.sleep(0.1) continue task = random.choice(self.tasks) try: task() except Exception as e: # simple failure handling: log and continue print(f"[RandomExecutor] Task raised: {e!r}") count += 1 if iterations is not None and count >= iterations: break if self.delay: time.sleep(self.delay) def stop(self): self._stop_event.set()
Usage example:
if __name__ == "__main__": import functools def task1(): print("task1") def task2(): print("task2") executor = RandomExecutor(delay=0.2) executor.add_task(task1) executor.add_task(task2) # Run in a background thread for demo t = threading.Thread(target=executor.start, kwargs={"iterations": 10}) t.start() t.join()
Notes:
- Tasks should be non-blocking or short-running; otherwise the executor becomes serialized.
- For blocking tasks consider running each task in a worker thread or process pool.
3. Asynchronous Random Executor (asyncio)
An asyncio version can run multiple tasks concurrently using asyncio.create_task while still choosing which to start randomly.
# random_executor_async.py import asyncio import random from typing import Callable, List, Coroutine, Optional AsyncCallable = Callable[[], Coroutine] class AsyncRandomExecutor: def __init__(self, tasks: Optional[List[AsyncCallable]] = None, concurrency: int = 5, start_delay: float = 0.0): """ tasks: list of zero-arg async callables (coroutines or functions returning coroutine) concurrency: max number of concurrent tasks running start_delay: delay between launching tasks """ self.tasks = tasks or [] self.concurrency = concurrency self.start_delay = start_delay self._running = True def add_task(self, task: AsyncCallable): self.tasks.append(task) async def run_once(self): if not self.tasks: await asyncio.sleep(0.1) return # launch up to concurrency tasks, chosen randomly to_launch = min(self.concurrency, len(self.tasks)) chosen = [random.choice(self.tasks) for _ in range(to_launch)] coros = [task() for task in chosen] running = [asyncio.create_task(c) for c in coros] if running: done, pending = await asyncio.wait(running, return_when=asyncio.FIRST_COMPLETED) # Optionally cancel pending to limit runtime; here we allow them to continue: # for p in pending: p.cancel() # await asyncio.gather(*pending, return_exceptions=True) async def start(self, iterations: Optional[int] = None): count = 0 while self._running: await self.run_once() count += 1 if iterations is not None and count >= iterations: break if self.start_delay: await asyncio.sleep(self.start_delay) def stop(self): self._running = False
Example usage:
async def sample_task(name, delay=1.0): await asyncio.sleep(delay) print(f"{name} done") async def main(): tasks = [lambda n=n: sample_task(f"task{n}", delay=0.5 + 0.1*n) for n in range(6)] ex = AsyncRandomExecutor(tasks=tasks, concurrency=3, start_delay=0.2) await ex.start(iterations=10) if __name__ == "__main__": asyncio.run(main())
Notes:
- This example launches up to concurrency tasks per loop iteration. You can adapt the policy (e.g., launch only when running count < concurrency).
4. Weighted random selection
To prefer some tasks, use weights. The standard library’s random.choices supports weights.
Synchronous example modification:
import random from typing import Callable, List, Optional, Sequence class WeightedRandomExecutor: def __init__(self, tasks: Optional[List[Callable]] = None, weights: Optional[Sequence[float]] = None): self.tasks = tasks or [] self.weights = list(weights) if weights is not None else [1.0]*len(self.tasks) def add_task(self, fn: Callable, weight: float = 1.0): self.tasks.append(fn) self.weights.append(weight) def pick(self): return random.choices(self.tasks, weights=self.weights, k=1)[0]
When adjusting weights at runtime, keep the weights list synchronized with tasks.
5. Avoid immediate repeats (exclusion window)
To reduce the chance of running the same task repeatedly, keep a short history and filter choices.
Simple approach for synchronous executor:
from collections import deque class NoImmediateRepeatExecutor(RandomExecutor): def __init__(self, *args, repeat_window=2, **kwargs): super().__init__(*args, **kwargs) self.history = deque(maxlen=repeat_window) def _choose(self): candidates = [t for t in self.tasks if t not in self.history] if not candidates: candidates = self.tasks choice = random.choice(candidates) self.history.append(choice) return choice def start(self, iterations: Optional[int] = None): count = 0 while not self._stop_event.is_set(): if not self.tasks: time.sleep(0.1) continue task = self._choose() try: task() except Exception as e: print(f"[NoImmediateRepeatExecutor] Task raised: {e!r}") count += 1 if iterations is not None and count >= iterations: break if self.delay: time.sleep(self.delay)
This reduces short consecutive repeats but preserves randomness.
6. Rate limiting and backoff
If tasks can be costly or an external system needs throttling, add rate limiting or exponential backoff for failing tasks.
Simple token-bucket rate limiter:
import time class TokenBucket: def __init__(self, rate: float, capacity: float): self.rate = rate self.capacity = capacity self.tokens = capacity self.last = time.monotonic() def consume(self, amount: float = 1.0) -> bool: now = time.monotonic() elapsed = now - self.last self.tokens = min(self.capacity, self.tokens + elapsed * self.rate) self.last = now if self.tokens >= amount: self.tokens -= amount return True return False
Use TokenBucket.consume() before running a task; if False, skip or wait.
Backoff example for retries:
import random import time def retry_with_backoff(fn, attempts=3, base=0.1, factor=2.0): for i in range(attempts): try: return fn() except Exception: if i == attempts - 1: raise sleep = base * (factor ** i) * (1 + random.random() * 0.1) time.sleep(sleep)
7. Testing strategies
- Unit tests: test selection distributions by sampling many picks and verifying approximate uniformity or weight-proportions (use chi-squared or simple ratio checks).
- Integration tests: run the executor against deterministic stub tasks that record calls (e.g., append to a list) and assert counts.
- Failure tests: task functions that raise exceptions; assert executor continues.
- Concurrency tests: in async version, verify concurrency limits are honored (use semaphores or counters).
- Property-based testing: assert invariants such as “no more than N repeats in a row” for exclusion window.
Example simple unit test (pytest):
def test_uniform_choice_distribution(): tasks = [lambda: None for _ in range(5)] ex = RandomExecutor(tasks=tasks) counts = {i: 0 for i in range(5)} for _ in range(10000): choice = random.choice(tasks) counts[tasks.index(choice)] += 1 # check counts roughly equal for v in counts.values(): assert abs(v - 2000) < 300
8. Performance considerations
- Python-level random.choice is fast; selection overhead is typically negligible compared to task runtime.
- For high throughput, avoid long Python-level loops with blocking I/O — use asyncio or worker pools.
- If tasks require isolation or heavy CPU work, use ProcessPoolExecutor.
- If you need millions of selections per second, consider using numpy for vectorized sampling or a compiled extension.
9. Practical examples and use cases
- Load testing: randomly select different request types to send to a server to simulate varied user behavior.
- Fuzz testing: pick random mutations or input generators.
- Game AI: choose random actions from a set with weighted preferences.
- Simulations: create stochastic event scheduling in simulations.
- Fairness testing: ensure no task is starved (combine randomness with constraints).
10. Extensions and advanced ideas
- Reinforcement learning: adapt weights based on observed reward to favor better-performing tasks.
- Zipfian or power-law sampling: model real-world non-uniform access patterns.
- Stateful selection: tasks maintain internal state and selection depends on state.
- Hybrid schedulers: combine random selection with priorities (e.g., probabilistic priority queues).
Conclusion
A Random Executor is straightforward but flexible: start with a simple uniform selection and progressively add features you need — async execution, weighting, exclusion windows, and rate limiting. Test selection distributions and failure handling, and choose the concurrency model appropriate for your tasks (async for I/O, processes for CPU). The code examples above provide a foundation you can extend for load tests, fuzzing, simulations, or lightweight randomized scheduling.
Leave a Reply