4. Resources and Facilities

In the previous section, we discussed traps and semaphores, which are the two primitive methods in simulus for process synchronization. Simulus provides several advanced mechanisms to coordinate processes and facilitate inter-process communication in order to ease the modeling effort. In this section, we discuss four such mechanisms: resource, store, bucket, and mailbox.

4.1. Resources

A semaphore is a primitive method to represent a resource. A process can atomically increment the semaphore to represent a resource has been added to the pool. A process can also atomically decrement the semaphore to represent a resource being removed from the pool.

Simulus actually provides a high-level abstraction of the resources, making it easier for users to create expressive models. A resource basically models a single-server or multi-server queue. A resource can allow only a limited number of processes to be serviced at any given time. A process arrives and acquires a server at the resource. If there is an available server, the process will gain access to the resource for as long as the service is required. If there isn’t an available server, the process will be put on hold and placed in a waiting queue. When another process has finished and released the server, one of the waiting processes will be unblocked and thereby gain access to the resource.

A process is expected to perform the following sequence of actions to use the resource. The process first calls the acquire() method to gain access to a server. This is potentially a blocking call: the process may be blocked if there’s no available server. The call returns if a server is available and has been assigned to the process. The process has acquired the resource. The process can then use the resource for as long as it needs to; this is usually modeled using the sleep() method. Afterwards, the same process is expected to call the release() method on the resource to free the server, so that other waiting processes may have a chance to gain access to the resource.

4.1.1. A Single-Server Queue

The following example shows the use of resource for simulating a single-server queue. It’s an M/M/1 queue with exponentially distributed inter-arrival time and exponentially distributed service time. We first create a resource with one server (capacity is one by default). We create an arrival process, which creates a job after sleeping for some random inter-arrival time in a forever loop. A job is a process. It first tries to acquire() a server from the resource. If the resource is unavailable (no available server), the job process will be suspended. Otherwise, the job is assigned with a server. The process sleeps for some random service time and then calls release() to free the server before leaving the queue (and terminating).

[1]:
# %load "../examples/basics/mm1.py"
import simulus

from random import seed, expovariate
seed(123)

def job(idx):
    r.acquire()
    print("%g: job(%d) gains access " % (sim.now,idx))
    sim.sleep(expovariate(1))
    print("%g: job(%d) releases" % (sim.now,idx))
    r.release()

def arrival():
    i = 0
    while True:
        i += 1
        sim.sleep(expovariate(0.95))
        print("%g: job(%d) arrives" % (sim.now,i))
        sim.process(job, i)

sim = simulus.simulator()
r = sim.resource()
sim.process(arrival)
sim.run(10)

0.0566152: job(1) arrives
0.0566152: job(1) gains access
0.15264: job(2) arrives
0.272591: job(3) arrives
0.579584: job(1) releases
0.579584: job(2) gains access
0.618484: job(2) releases
0.618484: job(3) gains access
1.38679: job(3) releases
2.70906: job(4) arrives
2.70906: job(4) gains access
3.13407: job(5) arrives
3.31718: job(6) arrives
3.75014: job(7) arrives
4.17767: job(8) arrives
4.47373: job(9) arrives
4.47549: job(10) arrives
4.62019: job(4) releases
4.62019: job(5) gains access
4.71188: job(5) releases
4.71188: job(6) gains access
5.07885: job(11) arrives
5.1551: job(12) arrives
5.55405: job(13) arrives
5.62219: job(6) releases
5.62219: job(7) gains access
6.18015: job(14) arrives
6.28263: job(15) arrives
6.44405: job(16) arrives
7.98027: job(7) releases
7.98027: job(8) gains access
8.00174: job(8) releases
8.00174: job(9) gains access
8.0872: job(17) arrives
8.98396: job(18) arrives
9.30851: job(19) arrives

The acquire() and release() sequence can be easily managed as a context, which we can use the Python’s with statement where a block of code can be executed in-between the two methods. The previous example can be re-written as follows:

[2]:
import simulus

from random import seed, expovariate
seed(123)

def job(idx):
    with r:
        print("%g: job(%d) gains access " % (sim.now,idx))
        sim.sleep(expovariate(1))
        print("%g: job(%d) releases" % (sim.now,idx))

def arrival():
    i = 0
    while True:
        i += 1
        sim.sleep(expovariate(0.95))
        print("%g: job(%d) arrives" % (sim.now,i))
        sim.process(job, i)

sim = simulus.simulator()
r = sim.resource()
sim.process(arrival)
sim.run(10)
0.0566152: job(1) arrives
0.0566152: job(1) gains access
0.15264: job(2) arrives
0.272591: job(3) arrives
0.579584: job(1) releases
0.579584: job(2) gains access
0.618484: job(2) releases
0.618484: job(3) gains access
1.38679: job(3) releases
2.70906: job(4) arrives
2.70906: job(4) gains access
3.13407: job(5) arrives
3.31718: job(6) arrives
3.75014: job(7) arrives
4.17767: job(8) arrives
4.47373: job(9) arrives
4.47549: job(10) arrives
4.62019: job(4) releases
4.62019: job(5) gains access
4.71188: job(5) releases
4.71188: job(6) gains access
5.07885: job(11) arrives
5.1551: job(12) arrives
5.55405: job(13) arrives
5.62219: job(6) releases
5.62219: job(7) gains access
6.18015: job(14) arrives
6.28263: job(15) arrives
6.44405: job(16) arrives
7.98027: job(7) releases
7.98027: job(8) gains access
8.00174: job(8) releases
8.00174: job(9) gains access
8.0872: job(17) arrives
8.98396: job(18) arrives
9.30851: job(19) arrives

4.1.2. Conditional Wait on Resources

A resource is also trappable. That means it can be presented to the simulator’s wait() function as an argument so that we can apply conditional wait on it. A resource is triggered if the process acquires a server and gains access to the resource.

We illustrate the use of conditional wait on resources with the bank renege example, which was originally written for the SimPy simulator. This example models a bank where customers arrive randomly. The bank counter can serve only one customer at a time. When a customer arrives and the bank counter is currently occupied, the customer enters a queue. Each customer also has a certain patience. If a customer waits too long, the customer may decide to abort, thus leaving the queue and the bank. (In queuing theory, this is called “reneging”).

[3]:
# %load "../examples/simpy/bank.py"
"""This example is modified from the simpy's bank renege example; we
use the same settings as simpy so that we can get the same results."""

RANDOM_SEED = 42        # random seed for repeatability
NUM_CUSTOMERS = 5       # total number of customers
INTV_CUSTOMERS = 10.0   # mean time between new customers
MEAN_BANK_TIME = 12.0   # mean time in bank for each customer
MIN_PATIENCE = 1        # min customer patience
MAX_PATIENCE = 3        # max customer patience

import simulus
from random import seed, expovariate, uniform

def source():
    for i in range(NUM_CUSTOMERS):
        sim.process(customer, i)
        sim.sleep(expovariate(1.0/INTV_CUSTOMERS))

def customer(idx):
    arrive = sim.now
    print('%7.4f Customer%02d: Here I am' % (arrive, idx))

    patience = uniform(MIN_PATIENCE, MAX_PATIENCE)
    _, timedout = sim.wait(counter, patience)
    if timedout:
        print('%7.4f Customer%02d: RENEGED after %6.3f' %
              (sim.now, idx, sim.now-arrive))
    else:
        print('%7.4f Customer%02d: Waited %6.3f' %
              (sim.now, idx, sim.now-arrive))
        sim.sleep(expovariate(1.0/MEAN_BANK_TIME))
        print('%7.4f Customer%02d: Finished' % (sim.now, idx))
        counter.release()

print('Bank renege')
seed(RANDOM_SEED)
sim = simulus.simulator()
counter = sim.resource()
sim.process(source)
sim.run()

Bank renege
 0.0000 Customer00: Here I am
 0.0000 Customer00: Waited  0.000
 3.8595 Customer00: Finished
10.2006 Customer01: Here I am
10.2006 Customer01: Waited  0.000
12.7265 Customer02: Here I am
13.9003 Customer02: RENEGED after  1.174
23.7507 Customer01: Finished
34.9993 Customer03: Here I am
34.9993 Customer03: Waited  0.000
37.9599 Customer03: Finished
40.4798 Customer04: Here I am
40.4798 Customer04: Waited  0.000
43.1401 Customer04: Finished

4.2. Stores and Buckets

Stores and buckets are used for synchronizing producer and consumer processes. A store is a facility for storing countable objects, such as jobs in a queue, packets in a network router, and i/o requests at a storage device. A bucket is used for storing uncountable quantities or volumes, such as gas in a tank, water in a reservoir, and battery power in a mobile device.

A store or a bucket has a maximum capacity. For a store, it must be a positive integer. For a bucket, it can be either an integer or a float-point number. A store or a bucket can also tell its current storage level, which goes between zero and the maximum capacity. One or several processes can put objects into the store or some quantities into the bucket. They are called the “producer processes”. One or several processes can get objects from the store or some quantities from the bucket. They are called the “consumer processes”. The producer processes and the consumer processes are determined by their performed actions (get or put). They can actually be the same process.

A producer process calls the put() method to deposit one or more objects into the store, or some quantities into the bucket. The put amount shall be specified as an argument. For the store, the default is one. The current storage level will increase accordingly as a result. However, if a producer process tries to put objects or quantities into the store or bucket such that the storage level would be more than the maximum capacity, the producer process will be blocked. The process will remain to be blocked until the storage level decreases (when some other processes get objects or quantities from the store or bucket) so that there is room for putting all the objects or quantities.

Similarly, a consumer process calls the get() method to retrieve one or more objects from the store or some quantities from the bucket. The get amount shall be specified as an argument. For the store, the default is one. The current storage level will decrease accordingly as a result. If a consumer process tries to get more objects or quantities than what is available, the consumer process will be blocked. The process will remain blocked until the current storage level goes above the requested amount (when some other processes put objects or quantities into the store or bucket).

The major difference between a store and a bucket is that the former is for countable objects and the latter is for uncountable quantities. Other than that, actually, the store facility can be used optionally for storing real Python objects. This is when the user calls the store’s put() method and passes in a Python object or a list/tuple of Python objects using the keyword ‘obj’ argument. In this case, the put amount must match with the number of objects. These Python objects can be retrieved in a first-in-first-out fashion by consumer processes calling the get() method of the store, which specifies the get amount. The same number of Python objects will be returned, either in a list if the get amount is greater than one, or by the object itself if the get amount is one.

4.2.1. Producer-Consumer Problem Revisited

Earlier we used two semaphores to solve the producer-consumer (bounded buffer) problem. In the following example, we use the store facility to achieve the same goal.

[4]:
# %load '../examples/basics/boundbuf-store.py'
import simulus

from random import seed, expovariate, gauss
seed(12345)

bufsiz = 5 # buffer capacity
items_produced = 0 # keep track the number of items produced
num_producers = 2 # number of producers
num_consumers = 3 # number of consumers

def producer(idx):
    global items_produced
    while True:
        sim.sleep(expovariate(1)) # take time to produce an item
        num = items_produced
        items_produced += 1
        print("%f: p[%d] produces item [%d]" % (sim.now, idx, num))
        s.put(obj=num)
        print("%f: p[%d] stores item [%d] in buffer" %
              (sim.now, idx, num))

def consumer(idx):
    while True:
        num = s.get()
        print("%f: c[%d] retrieves item [%d] from buffer" %
              (sim.now, idx, num))
        sim.sleep(gauss(0.8, 0.2)) # take time to consume the item
        print("%f: c[%d] consumes item[%d]" % (sim.now, idx, num))

sim = simulus.simulator()
s = sim.store(capacity=bufsiz)
for i in range(num_producers):
    sim.process(producer, i)
for i in range(num_consumers):
    sim.process(consumer, i)
sim.run(10)

0.010221: p[1] produces item [0]
0.010221: p[1] stores item [0] in buffer
0.010221: c[0] retrieves item [0] from buffer
0.538916: p[0] produces item [1]
0.538916: p[0] stores item [1] in buffer
0.538916: c[2] retrieves item [1] from buffer
0.752533: c[0] consumes item[0]
0.754168: p[0] produces item [2]
0.754168: p[0] stores item [2] in buffer
0.754168: c[1] retrieves item [2] from buffer
1.521765: c[2] consumes item[1]
1.588897: p[0] produces item [3]
1.588897: p[0] stores item [3] in buffer
1.588897: c[0] retrieves item [3] from buffer
1.608449: c[1] consumes item[2]
1.754371: p[1] produces item [4]
1.754371: p[1] stores item [4] in buffer
1.754371: c[2] retrieves item [4] from buffer
2.156181: p[0] produces item [5]
2.156181: p[0] stores item [5] in buffer
2.156181: c[1] retrieves item [5] from buffer
2.476470: c[0] consumes item[3]
2.580087: p[1] produces item [6]
2.580087: p[1] stores item [6] in buffer
2.580087: c[0] retrieves item [6] from buffer
2.594533: p[0] produces item [7]
2.594533: p[0] stores item [7] in buffer
2.670563: c[2] consumes item[4]
2.670563: c[2] retrieves item [7] from buffer
3.125764: p[0] produces item [8]
3.125764: p[0] stores item [8] in buffer
3.181913: c[1] consumes item[5]
3.181913: c[1] retrieves item [8] from buffer
3.771587: c[2] consumes item[7]
3.826813: p[0] produces item [9]
3.826813: p[0] stores item [9] in buffer
3.826813: c[2] retrieves item [9] from buffer
3.846008: c[0] consumes item[6]
4.037499: p[0] produces item [10]
4.037499: p[0] stores item [10] in buffer
4.037499: c[0] retrieves item [10] from buffer
4.172205: c[1] consumes item[8]
4.455382: p[0] produces item [11]
4.455382: p[0] stores item [11] in buffer
4.455382: c[1] retrieves item [11] from buffer
4.882414: c[2] consumes item[9]
5.017675: c[0] consumes item[10]
5.282205: c[1] consumes item[11]
5.751716: p[1] produces item [12]
5.751716: p[1] stores item [12] in buffer
5.751716: c[2] retrieves item [12] from buffer
6.551144: c[2] consumes item[12]
7.881357: p[0] produces item [13]
7.881357: p[0] stores item [13] in buffer
7.881357: c[0] retrieves item [13] from buffer
8.664728: c[0] consumes item[13]
9.605397: p[1] produces item [14]
9.605397: p[1] stores item [14] in buffer
9.605397: c[1] retrieves item [14] from buffer

In this example, we create a store with capacity being the buffer size. We create two producer processes and three consumer processes. Each producer process repeatedly sleeps for some time and then deposits an item to the store. An integer (the serial number of the created item) is passed in as the real object. Each consumer process first tries to get an item from the store. Since the store uses real objects, an earlier deposited integer will be returned. The consumer process then sleeps for some random time before it repeats in the loop.

4.2.2. Uncountable Quantities

The previous example shows that we can use store to represent the production and consumption of countable objects. Bucket can be used to represent the storage of uncountable quantities or volumes (gas, water, and battery power). The following example, which was originally written for the SimPy simulator, models a gas station and cars arriving at the station randomly for refueling. (The gas should be an uncountable quantity. In the example, however, it is actually countable as the number of liters. As such, we could use either a store or a bucket. We could use a floating point number and use the bucket facility if we were to change the original example).

[5]:
# %load "../examples/simpy/gas.py"
"""This example is modified from the simpy's gas station refueling
example; we use the same settings as simpy so that we can get the same
results."""

RANDOM_SEED = 42           # random seed for repeatability
GAS_STATION_SIZE = 200     # liters
THRESHOLD = 10             # Threshold for calling the tank truck (in %)
FUEL_TANK_SIZE = 50        # liters
FUEL_TANK_LEVEL = [5, 25]  # Min/max levels of fuel tanks (in liters)
REFUELING_SPEED = 2        # liters / second
TANK_TRUCK_TIME = 300      # Seconds it takes the tank truck to arrive
T_INTER = [30, 300]        # Create a car every [min, max] seconds
SIM_TIME = 1000            # Simulation time in seconds

import random, itertools
import simulus

def car_generator(sim, gas_station, fuel_pump):
    """Generate new cars that arrive at the gas station."""
    for i in itertools.count():
        sim.sleep(random.randint(*T_INTER))
        sim.process(car, 'Car %d' % i, sim, gas_station, fuel_pump)

def car(name, sim, gas_station, fuel_pump):
    """A car arrives at the gas station for refueling.

    It requests one of the gas station's fuel pumps and tries to get
    the desired amount of gas from it. If the stations reservoir is
    depleted, the car has to wait for the tank truck to arrive.

    """

    fuel_tank_level = random.randint(*FUEL_TANK_LEVEL)
    print('%s arriving at gas station at %.1f' % (name, sim.now))

    start = sim.now
    # Request one of the gas pumps
    gas_station.acquire()

    # Get the required amount of fuel
    liters_required = FUEL_TANK_SIZE - fuel_tank_level
    fuel_pump.get(liters_required)

    # The "actual" refueling process takes some time
    sim.sleep(liters_required / REFUELING_SPEED)

    gas_station.release()
    print('%s finished refueling in %.1f seconds.' %
          (name, sim.now - start))

def gas_station_control(sim, fuel_pump):
    """Periodically check the level of the *fuel_pump* and call the tank
    truck if the level falls below a threshold."""

    while True:
        if fuel_pump.level / fuel_pump.capacity * 100 < THRESHOLD:
            # We need to call the tank truck now!
            print('Calling tank truck at %d' % sim.now)

            # Wait for the tank truck to arrive and refuel the station
            sim.wait(sim.process(tank_truck, sim, fuel_pump))

        sim.sleep(10)  # Check every 10 seconds

def tank_truck(sim, fuel_pump):
    """Arrives at the gas station after a certain delay and refuels it."""

    sim.sleep(TANK_TRUCK_TIME)
    print('Tank truck arriving at time %d' % sim.now)

    ammount = fuel_pump.capacity - fuel_pump.level
    print('Tank truck refuelling %.1f liters.' % ammount)
    fuel_pump.put(ammount)

# Setup and start the simulation
print('Gas Station refuelling')
random.seed(RANDOM_SEED)

# Create simulator and start processes
sim = simulus.simulator()
gas_station = sim.resource(2)
fuel_pump = sim.bucket(GAS_STATION_SIZE, GAS_STATION_SIZE)
sim.process(gas_station_control, sim, fuel_pump)
sim.process(car_generator, sim, gas_station, fuel_pump)

# Execute!
sim.run(until=SIM_TIME)

Gas Station refuelling
Car 0 arriving at gas station at 87.0
Car 0 finished refueling in 18.5 seconds.
Car 1 arriving at gas station at 129.0
Car 1 finished refueling in 19.0 seconds.
Car 2 arriving at gas station at 284.0
Car 2 finished refueling in 21.0 seconds.
Car 3 arriving at gas station at 385.0
Car 3 finished refueling in 13.5 seconds.
Car 4 arriving at gas station at 459.0
Calling tank truck at 460
Car 4 finished refueling in 22.0 seconds.
Car 5 arriving at gas station at 705.0
Car 6 arriving at gas station at 750.0
Tank truck arriving at time 760
Tank truck refuelling 188.0 liters.
Car 6 finished refueling in 29.0 seconds.
Car 5 finished refueling in 76.5 seconds.
Car 7 arriving at gas station at 891.0
Car 7 finished refueling in 13.0 seconds.

The gas station has two fuel pumps sharing a fuel tank of 200 liters. The gas station is modeled as resource (with a capacity of two). And the shared fuel tank is modeled as a bucket (with a capacity of 200). The cars are processes which are created by the car_generator() process, one at a time with some random inter-arrival time. A car arriving at the gas station first tries to acquire the a fuel pump from the gas station. Once a fuel pump is available, the car gets the desired amount of gas from the fuel pump using the get() method, and then release the fuel pump before it leaves the gas station.

The gas station has a separate fuel level monitoring process, called gas_station_control(). It checks the fuel level at regular intervals (every 10 seconds). When the fuel level drops below a threshold, a tank truck will be called to refuel the tank. It takes some time for the fuel truck to arrive and when it does, it will fill up the tank using the put() method.

4.2.3. Conditional Wait on Stores

Both the store facility and the bucket facility are not trappables. That is, we can’t use it directly for conditional wait. There are two reasons. One is that a store or a bucket has two methods, get() and put(), both of which could block a process. We would need to explicitly specify on which of the two the process should be waiting. The other reason is that both get() and put() methods can take arguments. The get() method can take the number of countable objects (for store) or the amount of uncountable quantities (for bucket) to be retrieved. The put() method can take the number or amount. For store, one can also deposit the objects using put() if so desired.

To allow conditional wait, the store and the bucket provide two methods, getter() and putter(), that return the corresponding trappable for either retrieving from the store or bucket, or depositing into the store or bucket, respectively. The returned trappables can be used in a conditional wait.

In the following example, we create a scenario where a process, called checker(), tries to retrieve jobs from several queues (5 of them in total), one at a time. The queues are modeled as stores. The process also makes sure it checks on the time every 2 seconds. To do all these, the checker process performs a conditional wait by calling the sim.wait() function on the getter() trappables from all stores and specify a timeout using ‘until’. Depending on whether timeout happens or not, the process prints on a message.

[6]:
# %load "../examples/basics/tick.py"
from random import seed, expovariate, randrange
from sys import maxsize
import simulus

NQ = 5 # number of queues
TKTIME = 2 # interval between clock ticks

seed(12345)

def generator():
    jobid = 0 # job id increases monotonically
    while True:
        sim.sleep(expovariate(1))
        q = randrange(0, NQ)
        print("%g: job[%d] enters queue %d" % (sim.now, jobid, q))
        queues[q].put(obj=jobid)
        jobid += 1

def checker():
    tick = TKTIME
    while True:
        t = [q.getter() for q in queues]
        qs, timedout = sim.wait(t, until=tick, method=any)
        if timedout:
            print("%g: clock's ticking" % sim.now)
            tick += TKTIME
        else:
            q = qs.index(True)
            print("%g: job[%d] leaves queue %d" %
                  (sim.now, t[q].retval, q))

sim = simulus.simulator()
queues = [sim.store(capacity=maxsize) for _ in range(NQ)]
sim.process(generator)
sim.process(checker)
sim.run(10)

0.538916: job[0] enters queue 0
0.538916: job[0] leaves queue 0
2: clock's ticking
2.25469: job[1] enters queue 2
2.25469: job[1] leaves queue 2
4: clock's ticking
4.18666: job[2] enters queue 1
4.18666: job[2] leaves queue 1
4.50171: job[3] enters queue 3
4.50171: job[3] leaves queue 3
4.67807: job[4] enters queue 0
4.67807: job[4] leaves queue 0
6: clock's ticking
6.75093: job[5] enters queue 2
6.75093: job[5] leaves queue 2
7.57665: job[6] enters queue 1
7.57665: job[6] leaves queue 1
8: clock's ticking
8.5228: job[7] enters queue 1
8.5228: job[7] leaves queue 1
8.96115: job[8] enters queue 0
8.96115: job[8] leaves queue 0
9.7184: job[9] enters queue 3
9.7184: job[9] leaves queue 3

Note that the getter() trappable can keeps the returned value. In this example, the generator() process calls put() with the job’s id into the store. The object can be retrieved by a consumer process when it calls get(), or in this case, the conditional wait puts the object in retval when the getter() trappable is triggered.

4.3. Mailboxes

A mailbox is a facility designed specifically for message passing between processes or functions. A mailbox consists of one or more compartments or partitions. A sender can send a message to one of the partitions of the mailbox (which, by default, is partition 0) with a time delay. The message will be delivered to the designated mailbox partition at the expected time. Messages arriving at a mailbox will be stored in the individual partitions until a receiver retrieves them and removes them from the mailbox.

In Python, the concept of messages (or mails in the sense of a mailbox) takes a broader meaning. Basically, they could be any Python objects. And since Python is a dynamically-typed language, one can also use objects of different types as messages.

A mailbox is designed to be used for both process scheduling (in process-oriented simulation) and direct-event scheduling (for event-driven simulation). In the former, a process can send as many messages to a mailbox as it needs to (by repeatedly calling the mailbox’s send() method). The simulation time does not advance since send() does not block. As a matter of fact, one does not need to call send() in a process context at all. In this sense, a mailbox can be considered as a store with an infinite storage capacity. An important difference, however, is that one can send messages to mailboxes and specify a different delay each time.

One or more processes can call the mailbox’s recv() method trying to receive the messages arrived at a mailbox partition. If there are one or more messages already stored at the mailbox partition, the process will retrieve the messages from the partition and return them (in a list) without delay. Here, “retrieve” means the messages indeed will be removed from the mailbox partition. In doing so, subsequent calls to recv() will no longer find the retrieved messages in the mailbox partition. If there are no messages currently in the mailbox partition when calling recv(), the processes will be suspended pending on the arrival of a new message to the designated mailbox partition. Once a message arrives, all waiting processes at the partition will be unblocked and return with the retrieved message.

There are two ways for the recv() method to retrieve the messages. One is to retrieve all messages from the mailbox partition; the other is to retrieve only the first one stored in the mailbox partition. The user can specify which behavior is desirable by setting the ‘isall’ parameter when calling the recv() method. Regardless of the process trying to receive all messages or just one message, if there are multiple processes waiting to receive at a mailbox partition, it is possible a process wakes up upon a message’s arrival and return empty handed (because another process may have taken the message from the mailbox). Simulus do not dictate, when multiple processes are waiting to receive a message, which one will wake up first to retrieve the messages. It’d be arbitrary since simulus handles simultaneous events without model specified ordering.

A mailbox can also be used in the direct-event scheduling context (event-driven approach). In this case, the user can add one or more callback functions to a mailbox partition (using the add_callback() method). A callback function can be any user-defined function. Whenever a message is delivered to a mailbox partition, all callback functions attached to the mailbox partition will be invoked.

Within a callback function, the user has the option to either peek at the mailbox or retrieve (or do nothing of the kind, of course). One can call the peek() method to just look at the messages arrived at a mailbox partition. In this case, a list is returned containing all stored messages in the partition. The messages are not removed from the mailbox. One can also also call the retrieve() method. In this case, the user is given the option to retrieve just one of the messages or all messages. Again, “retrieve” means to remove the message or messages from the mailbox. Like recv(), the user passes in a boolean ‘isall’ argument to indicate which behavior is desirable.

4.3.1. The PHOLD Example

The following example shows a simple use of mailbox. We use the PHOLD model, which has been commonly used for benchmarking parallel and distributed simulators. Here we create four processes, called generate(), each representing a node. We also create the same number of mailboxes, one for each process. The generate() process tries to receive a job (a message from the mailbox), one at a time, by calling the recv() method with ‘isall’ set to be False. The process then forwards the job to a randomly selected node by calling send() with a random delay. To start the simulation, we initially send 5 jobs to the randomly selected nodes.

[7]:
# %load '../examples/basics/phold.py'
import simulus

from random import seed, expovariate, randrange
seed(12345)

job_count = 5
node_count = 4
lookahead = 1

def generate(idx):
    while True:
        msg = mb[idx].recv(isall=False)
        print("%g: node %d received job[%d]," % (sim.now, idx, msg), end=' ')
        target = randrange(node_count)
        delay = expovariate(1)+lookahead
        mb[target].send(msg, delay)
        print("sent to node %d with delay %g" % (target, delay))

sim = simulus.simulator()

mb = [sim.mailbox() for _ in range(node_count)]
for idx in range(node_count):
    sim.process(generate, idx)

# disperse the initial jobs
for idx in range(job_count):
    target = randrange(node_count)
    delay = expovariate(1)+lookahead
    mb[target].send(idx, delay)
    print("init sent job[%d] to node %d with delay %g" %
          (idx, target, delay))

sim.run(5)

init sent job[0] to node 3 with delay 2.31933
init sent job[1] to node 2 with delay 2.93198
init sent job[2] to node 1 with delay 1.31505
init sent job[3] to node 3 with delay 1.17636
init sent job[4] to node 0 with delay 3.07286
1.17636: node 3 received job[3], sent to node 2 with delay 1.82572
1.31505: node 1 received job[2], sent to node 1 with delay 1.94616
2.31933: node 3 received job[0], sent to node 1 with delay 1.43835
2.93198: node 2 received job[1], sent to node 0 with delay 1.75724
3.00208: node 2 received job[3], sent to node 3 with delay 1.8796
3.07286: node 0 received job[4], sent to node 1 with delay 1.16034
3.2612: node 1 received job[2], sent to node 0 with delay 1.21069
3.75768: node 1 received job[0], sent to node 2 with delay 1.38867
4.2332: node 1 received job[4], sent to node 3 with delay 1.41479
4.47189: node 0 received job[2], sent to node 3 with delay 1.00346
4.68922: node 0 received job[1], sent to node 1 with delay 2.47331
4.88168: node 3 received job[3], sent to node 1 with delay 1.33768

4.3.2. Peek versus Retrieve

As mentioned earlier, the recv() method can either retrieve all messages from the mailbox partition, or retrieve only the first one stored in the mailbox partition. In the callback function, the user can also just peek at the mailbox partition (without removing the messages), or retrieve one or all messages.

In the following example, we show a use case of these options.

[8]:
# %load '../examples/basics/delivery.py'
import simulus

from random import seed, expovariate, randint
seed(12345)

def generate():
    num = 0
    while True:
        sim.sleep(expovariate(1))
        print("%g: sent:" % sim.now, end=' ')
        for _ in range(randint(1,5)): # send a bunch
            print("%d" % num, end=' ')
            mb.send(num)
            num += 1
        print('')

def peek():
    msgs = mb.peek() # just peek at the mailbox
    if len(msgs) > 0:
        print("%g: peek() found:" % sim.now, end=' ')
        for m in msgs:
            print("%d" % m, end=' ')
        print('')
    else:
        print("%g: peek() found nothing" % sim.now)

def get_one():
    while True:
        sim.sleep(1)
        msg = mb.recv(isall=False)
        if msg is not None:
            print("%g: get_one() retrieved: %d" % (sim.now, msg))
        else:
            print("%g: get_one() retrieved nothing" % sim.now)

def get_all():
    while True:
        sim.sleep(5)
        msgs = mb.recv()
        if len(msgs) > 0:
            print("%g: get_all() retrieved:" % sim.now, end=' ')
            for m in msgs:
                print("%d" % m, end = ' ')
            print('')
        else:
            print("%g: get_all() retrieved nothing" % sim.now)

sim = simulus.simulator()
mb = sim.mailbox()
mb.add_callback(peek)
sim.process(generate)
sim.process(get_one)
sim.process(get_all)
sim.run(8)

0.538916: sent: 0
0.538916: peek() found: 0
1: get_one() retrieved: 0
2.25469: sent: 1 2 3
2.25469: peek() found: 1
2.25469: get_one() retrieved: 1
2.25469: peek() found: 2
2.25469: peek() found: 2 3
3.25469: get_one() retrieved: 2
4.18666: sent: 4 5
4.18666: peek() found: 3 4
4.18666: peek() found: 3 4 5
4.25469: get_one() retrieved: 3
4.50171: sent: 6 7 8 9
4.50171: peek() found: 4 5 6
4.50171: peek() found: 4 5 6 9
4.50171: peek() found: 4 5 6 9 7
4.50171: peek() found: 4 5 6 9 7 8
4.67807: sent: 10
4.67807: peek() found: 4 5 6 9 7 8 10
5: get_all() retrieved: 4 5 6 9 7 8 10
6.75093: sent: 11 12 13
6.75093: peek() found: 11
6.75093: get_one() retrieved: 11
6.75093: peek() found: 12
6.75093: peek() found: 12 13
7.57665: sent: 14 15
7.57665: peek() found: 12 13 14
7.57665: peek() found: 12 13 14 15
7.75093: get_one() retrieved: 12

In this example, we create one mailbox (with a single partition). We attach a callback function peek() to the mailbox (partition zero). Every time the mailbox receives a message, the peek() function will be invoked and the function literally just takes a peek at the mailbox and lists all the messages therein.

We create three processes. The generate() process sleeps for some random time and then sends a bunch of messages (ranging between 1 and 5) to the mailbox. And then it sleeps again and repeats the send.

The get_one() process sleeps for 1 second (from the previous receive) and tries to retrieve one message from the mailbox. It may be blocked if the mailbox is empty. As we can see from the example, the number of messages in the mailbox seems to increase over time, since the generate() may send more messages than those consumed by the get_one() process.

To avoid indefinite increase of the buffered messages, the get_all() process sleeps for 5 seconds and then tries to retrieve all messages from the mailbox in one call to recv() method. From the print-out, we can examine whether the behavior of the peeks and retrieves is as expected.

4.3.3. Conditional Wait on Mailboxes

Similar to store, the mailbox facility is not a trappable. That is, one cannot use the mailbox directly for conditional wait. We have the same reasons. a mailbox can have multiple partitions, the recv() method can be blocked on receiving messages from any of the partitions. The other reason is that the recv() method is expected to retrieve and return one or all of the messages depending on the ‘isall’ argument. To allow conditional wait, mailbox provides the receiver() method, which returns the corresponding trappable for receiving messages from a designated mailbox partition.

In the following example, we model the routine of a mailman and his patron. The overworked mailman is a process, who starts the day at 8 o’clock and sorts all mails until 2 o’clock in the afternoon. Then he starts to deliver the mails, which may take between one and five hours. We model this by sending a message to his patron’s mailbox with a random delay. The patron is another process, who comes back from work at 5 o’clock every day. He does a timed wait on the mailbox, representing his checking the mailbox until 6 o’clock. If the mail arrives before 6, the patron receives it. Otherwise, the patron gives up and he’ll receive it by next day.

[9]:
# %load '../examples/basics/mailman.py'
from time import gmtime, strftime
from random import seed, randint
import simulus

def strnow(t=None):
    if not t: t = sim.now
    return strftime("%H:%M", gmtime(t))

def mailman():
    day = 0
    while True:
        sim.sleep(until=day*24*3600+8*3600) # 8 o'clock
        print('--- day %d ---' % day)

        # sort the mails in the moring and get out for delivery at 2
        # o'clock in the afternoon
        sim.sleep(until=day*24*3600+14*3600)

        # it may take variable amount of time (between 1 to 5 hours)
        # before the mails can be delivered to people's mailboxes
        delay = randint(3600, 5*3600)
        mb.send('letter for day %d' % day, delay)
        print("%s mail truck's out, expected delivery at %s" %
              (strnow(), strnow(sim.now+delay)))

        # go to the next day
        day += 1

def patron():
    day = 0
    while True:
        # come back from work at 5 PM
        sim.sleep(until=day*24*3600+17*3600)

        # check the mailbox within an hour (until 6 PM)
        rcv = mb.receiver()
        _, timedout = sim.wait(rcv, 3600)
        if timedout:
            print("%s mail truck didn't come today" % strnow())
        else:
            for ltr in rcv.retval:
                print("%s receives '%s'" % (strnow(), ltr))

        # go to the next day
        day += 1

seed(12345)

sim = simulus.simulator()
mb = sim.mailbox()

sim.process(mailman)
sim.process(patron)

sim.run(5*24*3600)

--- day 0 ---
14:00 mail truck's out, expected delivery at 16:53
17:00 receives 'letter for day 0'
--- day 1 ---
14:00 mail truck's out, expected delivery at 18:20
18:00 mail truck didn't come today
--- day 2 ---
14:00 mail truck's out, expected delivery at 15:02
17:00 receives 'letter for day 1'
17:00 receives 'letter for day 2'
--- day 3 ---
14:00 mail truck's out, expected delivery at 18:43
18:00 mail truck didn't come today
--- day 4 ---
14:00 mail truck's out, expected delivery at 18:45
17:00 receives 'letter for day 3'