{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "## Resources and Facilities" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In the previous section, we discussed traps and semaphores, which are the two primitive methods in simulus for process synchronization. Simulus provides several advanced mechanisms to coordinate processes and facilitate inter-process communication in order to ease the modeling effort. In this section, we discuss four such mechanisms: resource, store, bucket, and mailbox." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Resources" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "A semaphore is a primitive method to represent a resource. A process can atomically increment the semaphore to represent a resource has been added to the pool. A process can also atomically decrement the semaphore to represent a resource being removed from the pool. \n", "\n", "Simulus actually provides a high-level abstraction of the resources, making it easier for users to create expressive models. A resource basically models a single-server or multi-server queue. A resource can allow only a limited number of processes to be serviced at any given time. A process arrives and acquires a server at the resource. If there is an available server, the process will gain access to the resource for as long as the service is required. If there isn't an available server, the process will be put on hold and placed in a waiting queue. When another process has finished and released the server, one of the waiting processes will be unblocked and thereby gain access to the resource.\n", "\n", "A process is expected to perform the following sequence of actions to use the resource. The process first calls the `acquire()` method to gain access to a server. This is potentially a blocking call: the process may be blocked if there's no available server. The call returns if a server is available and has been assigned to the process. The process has acquired the resource. The process can then use the resource for as long as it needs to; this is usually modeled using the `sleep()` method. Afterwards, the same process is expected to call the `release()` method on the resource to free the server, so that other waiting processes may have a chance to gain access to the resource." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### A Single-Server Queue" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The following example shows the use of resource for simulating a single-server queue. It's an M/M/1 queue with exponentially distributed inter-arrival time and exponentially distributed service time. We first create a resource with one server (capacity is one by default). We create an arrival process, which creates a job after sleeping for some random inter-arrival time in a forever loop. A job is a process. It first tries to `acquire()` a server from the resource. If the resource is unavailable (no available server), the job process will be suspended. Otherwise, the job is assigned with a server. The process sleeps for some random service time and then calls `release()` to free the server before leaving the queue (and terminating)." ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "0.0566152: job(1) arrives\n", "0.0566152: job(1) gains access \n", "0.15264: job(2) arrives\n", "0.272591: job(3) arrives\n", "0.579584: job(1) releases\n", "0.579584: job(2) gains access \n", "0.618484: job(2) releases\n", "0.618484: job(3) gains access \n", "1.38679: job(3) releases\n", "2.70906: job(4) arrives\n", "2.70906: job(4) gains access \n", "3.13407: job(5) arrives\n", "3.31718: job(6) arrives\n", "3.75014: job(7) arrives\n", "4.17767: job(8) arrives\n", "4.47373: job(9) arrives\n", "4.47549: job(10) arrives\n", "4.62019: job(4) releases\n", "4.62019: job(5) gains access \n", "4.71188: job(5) releases\n", "4.71188: job(6) gains access \n", "5.07885: job(11) arrives\n", "5.1551: job(12) arrives\n", "5.55405: job(13) arrives\n", "5.62219: job(6) releases\n", "5.62219: job(7) gains access \n", "6.18015: job(14) arrives\n", "6.28263: job(15) arrives\n", "6.44405: job(16) arrives\n", "7.98027: job(7) releases\n", "7.98027: job(8) gains access \n", "8.00174: job(8) releases\n", "8.00174: job(9) gains access \n", "8.0872: job(17) arrives\n", "8.98396: job(18) arrives\n", "9.30851: job(19) arrives\n" ] } ], "source": [ "# %load \"../examples/basics/mm1.py\"\n", "import simulus\n", "\n", "from random import seed, expovariate\n", "seed(123)\n", "\n", "def job(idx):\n", " r.acquire()\n", " print(\"%g: job(%d) gains access \" % (sim.now,idx))\n", " sim.sleep(expovariate(1))\n", " print(\"%g: job(%d) releases\" % (sim.now,idx))\n", " r.release()\n", " \n", "def arrival():\n", " i = 0\n", " while True:\n", " i += 1\n", " sim.sleep(expovariate(0.95))\n", " print(\"%g: job(%d) arrives\" % (sim.now,i))\n", " sim.process(job, i)\n", "\n", "sim = simulus.simulator()\n", "r = sim.resource()\n", "sim.process(arrival)\n", "sim.run(10)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The `acquire()` and `release()` sequence can be easily managed as a context, which we can use the Python's `with` statement where a block of code can be executed in-between the two methods. The previous example can be re-written as follows:" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "0.0566152: job(1) arrives\n", "0.0566152: job(1) gains access \n", "0.15264: job(2) arrives\n", "0.272591: job(3) arrives\n", "0.579584: job(1) releases\n", "0.579584: job(2) gains access \n", "0.618484: job(2) releases\n", "0.618484: job(3) gains access \n", "1.38679: job(3) releases\n", "2.70906: job(4) arrives\n", "2.70906: job(4) gains access \n", "3.13407: job(5) arrives\n", "3.31718: job(6) arrives\n", "3.75014: job(7) arrives\n", "4.17767: job(8) arrives\n", "4.47373: job(9) arrives\n", "4.47549: job(10) arrives\n", "4.62019: job(4) releases\n", "4.62019: job(5) gains access \n", "4.71188: job(5) releases\n", "4.71188: job(6) gains access \n", "5.07885: job(11) arrives\n", "5.1551: job(12) arrives\n", "5.55405: job(13) arrives\n", "5.62219: job(6) releases\n", "5.62219: job(7) gains access \n", "6.18015: job(14) arrives\n", "6.28263: job(15) arrives\n", "6.44405: job(16) arrives\n", "7.98027: job(7) releases\n", "7.98027: job(8) gains access \n", "8.00174: job(8) releases\n", "8.00174: job(9) gains access \n", "8.0872: job(17) arrives\n", "8.98396: job(18) arrives\n", "9.30851: job(19) arrives\n" ] } ], "source": [ "import simulus\n", "\n", "from random import seed, expovariate\n", "seed(123)\n", "\n", "def job(idx):\n", " with r:\n", " print(\"%g: job(%d) gains access \" % (sim.now,idx))\n", " sim.sleep(expovariate(1))\n", " print(\"%g: job(%d) releases\" % (sim.now,idx))\n", " \n", "def arrival():\n", " i = 0\n", " while True:\n", " i += 1\n", " sim.sleep(expovariate(0.95))\n", " print(\"%g: job(%d) arrives\" % (sim.now,i))\n", " sim.process(job, i)\n", "\n", "sim = simulus.simulator()\n", "r = sim.resource()\n", "sim.process(arrival)\n", "sim.run(10)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Conditional Wait on Resources" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "A resource is also trappable. That means it can be presented to the simulator's `wait()` function as an argument so that we can apply conditional wait on it. A resource is triggered if the process acquires a server and gains access to the resource. \n", "\n", "We illustrate the use of conditional wait on resources with the bank renege example, which was originally written for the SimPy simulator. This example models a bank where customers arrive randomly. The bank counter can serve only one customer at a time. When a customer arrives and the bank counter is currently occupied, the customer enters a queue. Each customer also has a certain patience. If a customer waits too long, the customer may decide to abort, thus leaving the queue and the bank. (In queuing theory, this is called \"reneging\"). " ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Bank renege\n", " 0.0000 Customer00: Here I am\n", " 0.0000 Customer00: Waited 0.000\n", " 3.8595 Customer00: Finished\n", "10.2006 Customer01: Here I am\n", "10.2006 Customer01: Waited 0.000\n", "12.7265 Customer02: Here I am\n", "13.9003 Customer02: RENEGED after 1.174\n", "23.7507 Customer01: Finished\n", "34.9993 Customer03: Here I am\n", "34.9993 Customer03: Waited 0.000\n", "37.9599 Customer03: Finished\n", "40.4798 Customer04: Here I am\n", "40.4798 Customer04: Waited 0.000\n", "43.1401 Customer04: Finished\n" ] } ], "source": [ "# %load \"../examples/simpy/bank.py\"\n", "\"\"\"This example is modified from the simpy's bank renege example; we\n", "use the same settings as simpy so that we can get the same results.\"\"\"\n", "\n", "RANDOM_SEED = 42 # random seed for repeatability\n", "NUM_CUSTOMERS = 5 # total number of customers\n", "INTV_CUSTOMERS = 10.0 # mean time between new customers\n", "MEAN_BANK_TIME = 12.0 # mean time in bank for each customer\n", "MIN_PATIENCE = 1 # min customer patience\n", "MAX_PATIENCE = 3 # max customer patience\n", "\n", "import simulus\n", "from random import seed, expovariate, uniform\n", "\n", "def source():\n", " for i in range(NUM_CUSTOMERS):\n", " sim.process(customer, i)\n", " sim.sleep(expovariate(1.0/INTV_CUSTOMERS))\n", "\n", "def customer(idx):\n", " arrive = sim.now\n", " print('%7.4f Customer%02d: Here I am' % (arrive, idx))\n", "\n", " patience = uniform(MIN_PATIENCE, MAX_PATIENCE)\n", " _, timedout = sim.wait(counter, patience)\n", " if timedout:\n", " print('%7.4f Customer%02d: RENEGED after %6.3f' %\n", " (sim.now, idx, sim.now-arrive))\n", " else:\n", " print('%7.4f Customer%02d: Waited %6.3f' %\n", " (sim.now, idx, sim.now-arrive))\n", " sim.sleep(expovariate(1.0/MEAN_BANK_TIME))\n", " print('%7.4f Customer%02d: Finished' % (sim.now, idx))\n", " counter.release()\n", "\n", "print('Bank renege')\n", "seed(RANDOM_SEED)\n", "sim = simulus.simulator()\n", "counter = sim.resource()\n", "sim.process(source)\n", "sim.run()\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Stores and Buckets" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Stores and buckets are used for synchronizing producer and consumer processes. A store is a facility for storing countable objects, such as jobs in a queue, packets in a network router, and i/o requests at a storage device. A bucket is used for storing uncountable quantities or volumes, such as gas in a tank, water in a reservoir, and battery power in a mobile device.\n", "\n", "A store or a bucket has a maximum capacity. For a store, it must be a positive integer. For a bucket, it can be either an integer or a float-point number. A store or a bucket can also tell its current storage level, which goes between zero and the maximum capacity. One or several processes can put objects into the store or some quantities into the bucket. They are called the \"producer processes\". One or several processes can get objects from the store or some quantities from the bucket. They are called the \"consumer processes\". The producer processes and the consumer processes are determined by their performed actions (get or put). They can actually be the same process.\n", "\n", "A producer process calls the `put()` method to deposit one or more objects into the store, or some quantities into the bucket. The put amount shall be specified as an argument. For the store, the default is one. The current storage level will increase accordingly as a result. However, if a producer process tries to put objects or quantities into the store or bucket such that the storage level would be more than the maximum capacity, the producer process will be blocked. The process will remain to be blocked until the storage level decreases (when some other processes get objects or quantities from the store or bucket) so that there is room for putting all the objects or quantities.\n", "\n", "Similarly, a consumer process calls the `get()` method to retrieve one or more objects from the store or some quantities from the bucket. The get amount shall be specified as an argument. For the store, the default is one. The current storage level will decrease accordingly as a result. If a consumer process tries to get more objects or quantities than what is available, the consumer process will be blocked. The process will remain blocked until the current storage level goes above the requested amount (when some other processes put objects or quantities into the store or bucket).\n", "\n", "The major difference between a store and a bucket is that the former is for countable objects and the latter is for uncountable quantities. Other than that, actually, the store facility can be used optionally for storing real Python objects. This is when the user calls the store's `put()` method and passes in a Python object or a list/tuple of Python objects using the keyword 'obj' argument. In this case, the put amount must match with the number of objects. These Python objects can be retrieved in a first-in-first-out fashion by consumer processes calling the `get()` method of the store, which specifies the get amount. The same number of Python objects will be returned, either in a list if the get amount is greater than one, or by the object itself if the get amount is one." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Producer-Consumer Problem Revisited" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Earlier we used two semaphores to solve the producer-consumer (bounded buffer) problem. In the following example, we use the store facility to achieve the same goal." ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "0.010221: p[1] produces item [0]\n", "0.010221: p[1] stores item [0] in buffer\n", "0.010221: c[0] retrieves item [0] from buffer\n", "0.538916: p[0] produces item [1]\n", "0.538916: p[0] stores item [1] in buffer\n", "0.538916: c[2] retrieves item [1] from buffer\n", "0.752533: c[0] consumes item[0]\n", "0.754168: p[0] produces item [2]\n", "0.754168: p[0] stores item [2] in buffer\n", "0.754168: c[1] retrieves item [2] from buffer\n", "1.521765: c[2] consumes item[1]\n", "1.588897: p[0] produces item [3]\n", "1.588897: p[0] stores item [3] in buffer\n", "1.588897: c[0] retrieves item [3] from buffer\n", "1.608449: c[1] consumes item[2]\n", "1.754371: p[1] produces item [4]\n", "1.754371: p[1] stores item [4] in buffer\n", "1.754371: c[2] retrieves item [4] from buffer\n", "2.156181: p[0] produces item [5]\n", "2.156181: p[0] stores item [5] in buffer\n", "2.156181: c[1] retrieves item [5] from buffer\n", "2.476470: c[0] consumes item[3]\n", "2.580087: p[1] produces item [6]\n", "2.580087: p[1] stores item [6] in buffer\n", "2.580087: c[0] retrieves item [6] from buffer\n", "2.594533: p[0] produces item [7]\n", "2.594533: p[0] stores item [7] in buffer\n", "2.670563: c[2] consumes item[4]\n", "2.670563: c[2] retrieves item [7] from buffer\n", "3.125764: p[0] produces item [8]\n", "3.125764: p[0] stores item [8] in buffer\n", "3.181913: c[1] consumes item[5]\n", "3.181913: c[1] retrieves item [8] from buffer\n", "3.771587: c[2] consumes item[7]\n", "3.826813: p[0] produces item [9]\n", "3.826813: p[0] stores item [9] in buffer\n", "3.826813: c[2] retrieves item [9] from buffer\n", "3.846008: c[0] consumes item[6]\n", "4.037499: p[0] produces item [10]\n", "4.037499: p[0] stores item [10] in buffer\n", "4.037499: c[0] retrieves item [10] from buffer\n", "4.172205: c[1] consumes item[8]\n", "4.455382: p[0] produces item [11]\n", "4.455382: p[0] stores item [11] in buffer\n", "4.455382: c[1] retrieves item [11] from buffer\n", "4.882414: c[2] consumes item[9]\n", "5.017675: c[0] consumes item[10]\n", "5.282205: c[1] consumes item[11]\n", "5.751716: p[1] produces item [12]\n", "5.751716: p[1] stores item [12] in buffer\n", "5.751716: c[2] retrieves item [12] from buffer\n", "6.551144: c[2] consumes item[12]\n", "7.881357: p[0] produces item [13]\n", "7.881357: p[0] stores item [13] in buffer\n", "7.881357: c[0] retrieves item [13] from buffer\n", "8.664728: c[0] consumes item[13]\n", "9.605397: p[1] produces item [14]\n", "9.605397: p[1] stores item [14] in buffer\n", "9.605397: c[1] retrieves item [14] from buffer\n" ] } ], "source": [ "# %load '../examples/basics/boundbuf-store.py'\n", "import simulus\n", "\n", "from random import seed, expovariate, gauss\n", "seed(12345)\n", "\n", "bufsiz = 5 # buffer capacity\n", "items_produced = 0 # keep track the number of items produced\n", "num_producers = 2 # number of producers\n", "num_consumers = 3 # number of consumers\n", "\n", "def producer(idx):\n", " global items_produced\n", " while True:\n", " sim.sleep(expovariate(1)) # take time to produce an item\n", " num = items_produced\n", " items_produced += 1\n", " print(\"%f: p[%d] produces item [%d]\" % (sim.now, idx, num))\n", " s.put(obj=num)\n", " print(\"%f: p[%d] stores item [%d] in buffer\" % \n", " (sim.now, idx, num))\n", " \n", "def consumer(idx):\n", " while True:\n", " num = s.get()\n", " print(\"%f: c[%d] retrieves item [%d] from buffer\" %\n", " (sim.now, idx, num))\n", " sim.sleep(gauss(0.8, 0.2)) # take time to consume the item\n", " print(\"%f: c[%d] consumes item[%d]\" % (sim.now, idx, num)) \n", "\n", "sim = simulus.simulator()\n", "s = sim.store(capacity=bufsiz)\n", "for i in range(num_producers): \n", " sim.process(producer, i)\n", "for i in range(num_consumers):\n", " sim.process(consumer, i)\n", "sim.run(10)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In this example, we create a store with capacity being the buffer size. We create two producer processes and three consumer processes. Each producer process repeatedly sleeps for some time and then deposits an item to the store. An integer (the serial number of the created item) is passed in as the real object. Each consumer process first tries to get an item from the store. Since the store uses real objects, an earlier deposited integer will be returned. The consumer process then sleeps for some random time before it repeats in the loop." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Uncountable Quantities" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The previous example shows that we can use store to represent the production and consumption of countable objects. Bucket can be used to represent the storage of uncountable quantities or volumes (gas, water, and battery power). The following example, which was originally written for the SimPy simulator, models a gas station and cars arriving at the station randomly for refueling. *(The gas should be an uncountable quantity. In the example, however, it is actually countable as the number of liters. As such, we could use either a store or a bucket. We could use a floating point number and use the bucket facility if we were to change the original example).* " ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Gas Station refuelling\n", "Car 0 arriving at gas station at 87.0\n", "Car 0 finished refueling in 18.5 seconds.\n", "Car 1 arriving at gas station at 129.0\n", "Car 1 finished refueling in 19.0 seconds.\n", "Car 2 arriving at gas station at 284.0\n", "Car 2 finished refueling in 21.0 seconds.\n", "Car 3 arriving at gas station at 385.0\n", "Car 3 finished refueling in 13.5 seconds.\n", "Car 4 arriving at gas station at 459.0\n", "Calling tank truck at 460\n", "Car 4 finished refueling in 22.0 seconds.\n", "Car 5 arriving at gas station at 705.0\n", "Car 6 arriving at gas station at 750.0\n", "Tank truck arriving at time 760\n", "Tank truck refuelling 188.0 liters.\n", "Car 6 finished refueling in 29.0 seconds.\n", "Car 5 finished refueling in 76.5 seconds.\n", "Car 7 arriving at gas station at 891.0\n", "Car 7 finished refueling in 13.0 seconds.\n" ] } ], "source": [ "# %load \"../examples/simpy/gas.py\"\n", "\"\"\"This example is modified from the simpy's gas station refueling\n", "example; we use the same settings as simpy so that we can get the same\n", "results.\"\"\"\n", "\n", "RANDOM_SEED = 42 # random seed for repeatability\n", "GAS_STATION_SIZE = 200 # liters\n", "THRESHOLD = 10 # Threshold for calling the tank truck (in %)\n", "FUEL_TANK_SIZE = 50 # liters\n", "FUEL_TANK_LEVEL = [5, 25] # Min/max levels of fuel tanks (in liters)\n", "REFUELING_SPEED = 2 # liters / second\n", "TANK_TRUCK_TIME = 300 # Seconds it takes the tank truck to arrive\n", "T_INTER = [30, 300] # Create a car every [min, max] seconds\n", "SIM_TIME = 1000 # Simulation time in seconds\n", "\n", "import random, itertools\n", "import simulus\n", "\n", "def car_generator(sim, gas_station, fuel_pump):\n", " \"\"\"Generate new cars that arrive at the gas station.\"\"\"\n", " for i in itertools.count():\n", " sim.sleep(random.randint(*T_INTER))\n", " sim.process(car, 'Car %d' % i, sim, gas_station, fuel_pump)\n", "\n", "def car(name, sim, gas_station, fuel_pump):\n", " \"\"\"A car arrives at the gas station for refueling.\n", "\n", " It requests one of the gas station's fuel pumps and tries to get\n", " the desired amount of gas from it. If the stations reservoir is\n", " depleted, the car has to wait for the tank truck to arrive.\n", "\n", " \"\"\"\n", " \n", " fuel_tank_level = random.randint(*FUEL_TANK_LEVEL)\n", " print('%s arriving at gas station at %.1f' % (name, sim.now))\n", "\n", " start = sim.now\n", " # Request one of the gas pumps\n", " gas_station.acquire()\n", "\n", " # Get the required amount of fuel\n", " liters_required = FUEL_TANK_SIZE - fuel_tank_level\n", " fuel_pump.get(liters_required)\n", "\n", " # The \"actual\" refueling process takes some time\n", " sim.sleep(liters_required / REFUELING_SPEED)\n", "\n", " gas_station.release()\n", " print('%s finished refueling in %.1f seconds.' %\n", " (name, sim.now - start))\n", "\n", "def gas_station_control(sim, fuel_pump):\n", " \"\"\"Periodically check the level of the *fuel_pump* and call the tank\n", " truck if the level falls below a threshold.\"\"\"\n", "\n", " while True:\n", " if fuel_pump.level / fuel_pump.capacity * 100 < THRESHOLD:\n", " # We need to call the tank truck now!\n", " print('Calling tank truck at %d' % sim.now)\n", "\n", " # Wait for the tank truck to arrive and refuel the station\n", " sim.wait(sim.process(tank_truck, sim, fuel_pump))\n", "\n", " sim.sleep(10) # Check every 10 seconds\n", "\n", "def tank_truck(sim, fuel_pump):\n", " \"\"\"Arrives at the gas station after a certain delay and refuels it.\"\"\"\n", " \n", " sim.sleep(TANK_TRUCK_TIME)\n", " print('Tank truck arriving at time %d' % sim.now)\n", " \n", " ammount = fuel_pump.capacity - fuel_pump.level\n", " print('Tank truck refuelling %.1f liters.' % ammount)\n", " fuel_pump.put(ammount)\n", "\n", "# Setup and start the simulation\n", "print('Gas Station refuelling')\n", "random.seed(RANDOM_SEED)\n", "\n", "# Create simulator and start processes\n", "sim = simulus.simulator()\n", "gas_station = sim.resource(2)\n", "fuel_pump = sim.bucket(GAS_STATION_SIZE, GAS_STATION_SIZE)\n", "sim.process(gas_station_control, sim, fuel_pump)\n", "sim.process(car_generator, sim, gas_station, fuel_pump)\n", "\n", "# Execute!\n", "sim.run(until=SIM_TIME)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The gas station has two fuel pumps sharing a fuel tank of 200 liters. The gas station is modeled as resource (with a capacity of two). And the shared fuel tank is modeled as a bucket (with a capacity of 200). The cars are processes which are created by the `car_generator()` process, one at a time with some random inter-arrival time. A car arriving at the gas station first tries to acquire the a fuel pump from the gas station. Once a fuel pump is available, the car gets the desired amount of gas from the fuel pump using the `get()` method, and then release the fuel pump before it leaves the gas station.\n", "\n", "The gas station has a separate fuel level monitoring process, called `gas_station_control()`. It checks the fuel level at regular intervals (every 10 seconds). When the fuel level drops below a threshold, a tank truck will be called to refuel the tank. It takes some time for the fuel truck to arrive and when it does, it will fill up the tank using the `put()` method." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Conditional Wait on Stores" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Both the store facility and the bucket facility are not trappables. That is, we can't use it directly for conditional wait. There are two reasons. One is that a store or a bucket has two methods, `get()` and `put()`, both of which could block a process. We would need to explicitly specify on which of the two the process should be waiting. The other reason is that both `get()` and `put()` methods can take arguments. The `get()` method can take the number of countable objects (for store) or the amount of uncountable quantities (for bucket) to be retrieved. The `put()` method can take the number or amount. For store, one can also deposit the objects using `put()` if so desired. \n", "\n", "To allow conditional wait, the store and the bucket provide two methods, `getter()` and `putter()`, that return the corresponding trappable for either retrieving from the store or bucket, or depositing into the store or bucket, respectively. The returned trappables can be used in a conditional wait.\n", "\n", "In the following example, we create a scenario where a process, called `checker()`, tries to retrieve jobs from several queues (5 of them in total), one at a time. The queues are modeled as stores. The process also makes sure it checks on the time every 2 seconds. To do all these, the checker process performs a conditional wait by calling the `sim.wait()` function on the `getter()` trappables from all stores and specify a timeout using 'until'. Depending on whether timeout happens or not, the process prints on a message." ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "0.538916: job[0] enters queue 0\n", "0.538916: job[0] leaves queue 0\n", "2: clock's ticking\n", "2.25469: job[1] enters queue 2\n", "2.25469: job[1] leaves queue 2\n", "4: clock's ticking\n", "4.18666: job[2] enters queue 1\n", "4.18666: job[2] leaves queue 1\n", "4.50171: job[3] enters queue 3\n", "4.50171: job[3] leaves queue 3\n", "4.67807: job[4] enters queue 0\n", "4.67807: job[4] leaves queue 0\n", "6: clock's ticking\n", "6.75093: job[5] enters queue 2\n", "6.75093: job[5] leaves queue 2\n", "7.57665: job[6] enters queue 1\n", "7.57665: job[6] leaves queue 1\n", "8: clock's ticking\n", "8.5228: job[7] enters queue 1\n", "8.5228: job[7] leaves queue 1\n", "8.96115: job[8] enters queue 0\n", "8.96115: job[8] leaves queue 0\n", "9.7184: job[9] enters queue 3\n", "9.7184: job[9] leaves queue 3\n" ] } ], "source": [ "# %load \"../examples/basics/tick.py\"\n", "from random import seed, expovariate, randrange\n", "from sys import maxsize \n", "import simulus\n", "\n", "NQ = 5 # number of queues\n", "TKTIME = 2 # interval between clock ticks\n", "\n", "seed(12345)\n", "\n", "def generator():\n", " jobid = 0 # job id increases monotonically \n", " while True:\n", " sim.sleep(expovariate(1))\n", " q = randrange(0, NQ)\n", " print(\"%g: job[%d] enters queue %d\" % (sim.now, jobid, q))\n", " queues[q].put(obj=jobid)\n", " jobid += 1\n", "\n", "def checker():\n", " tick = TKTIME\n", " while True:\n", " t = [q.getter() for q in queues]\n", " qs, timedout = sim.wait(t, until=tick, method=any)\n", " if timedout:\n", " print(\"%g: clock's ticking\" % sim.now)\n", " tick += TKTIME\n", " else:\n", " q = qs.index(True)\n", " print(\"%g: job[%d] leaves queue %d\" % \n", " (sim.now, t[q].retval, q))\n", " \n", "sim = simulus.simulator()\n", "queues = [sim.store(capacity=maxsize) for _ in range(NQ)]\n", "sim.process(generator)\n", "sim.process(checker)\n", "sim.run(10)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Note that the `getter()` trappable can keeps the returned value. In this example, the `generator()` process calls `put()` with the job's id into the store. The object can be retrieved by a consumer process when it calls `get()`, or in this case, the conditional wait puts the object in `retval` when the `getter()` trappable is triggered." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Mailboxes" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "A mailbox is a facility designed specifically for message passing between processes or functions. A mailbox consists of one or more compartments or partitions. A sender can send a message to one of the partitions of the mailbox (which, by default, is partition 0) with a time delay. The message will be delivered to the designated mailbox partition at the expected time. Messages arriving at a mailbox will be stored in the individual partitions until a receiver retrieves them and removes them from the mailbox.\n", "\n", "In Python, the concept of messages (or mails in the sense of a mailbox) takes a broader meaning. Basically, they could be any Python objects. And since Python is a dynamically-typed language, one can also use objects of different types as messages.\n", "\n", "A mailbox is designed to be used for both process scheduling (in process-oriented simulation) and direct-event scheduling (for event-driven simulation). In the former, a process can send as many messages to a mailbox as it needs to (by repeatedly calling the mailbox's `send()` method). The simulation time does not advance since `send()` does not block. As a matter of fact, one does not need to call `send()` in a process context at all. In this sense, a mailbox can be considered as a store with an infinite storage capacity. An important difference, however, is that one can send messages to mailboxes and specify a different delay each time.\n", "\n", "One or more processes can call the mailbox's `recv()` method trying to receive the messages arrived at a mailbox partition. If there are one or more messages already stored at the mailbox partition, the process will retrieve the messages from the partition and return them (in a list) without delay. Here, \"retrieve\" means the messages indeed will be removed from the mailbox partition. In doing so, subsequent calls to `recv()` will no longer find the retrieved messages in the mailbox partition. If there are no messages currently in the mailbox partition when calling `recv()`, the processes will be suspended pending on the arrival of a new message to the designated mailbox partition. Once a message arrives, *all* waiting processes at the partition will be unblocked and return with the retrieved message.\n", "\n", "There are two ways for the `recv()` method to retrieve the messages. One is to retrieve all messages from the mailbox partition; the other is to retrieve only the first one stored in the mailbox partition. The user can specify which behavior is desirable by setting the 'isall' parameter when calling the `recv()` method. Regardless of the process trying to receive all messages or just one message, if there are multiple processes waiting to receive at a mailbox partition, it is possible a process wakes up upon a message's arrival and return empty handed (because another process may have taken the message from the mailbox). Simulus do not dictate, when multiple processes are waiting to receive a message, which one will wake up first to retrieve the messages. It'd be arbitrary since simulus handles simultaneous events without model specified ordering.\n", "\n", "A mailbox can also be used in the direct-event scheduling context (event-driven approach). In this case, the user can add one or more callback functions to a mailbox partition (using the `add_callback()` method). A callback function can be any user-defined function. Whenever a message is delivered to a mailbox partition, *all* callback functions attached to the mailbox partition will be invoked.\n", "\n", "Within a callback function, the user has the option to either peek at the mailbox or retrieve (or do nothing of the kind, of course). One can call the `peek()` method to just look at the messages arrived at a mailbox partition. In this case, a list is returned containing all stored messages in the partition. The messages are not removed from the mailbox. One can also also call the `retrieve()` method. In this case, the user is given the option to retrieve just one of the messages or all messages. Again, \"retrieve\" means to remove the message or messages from the mailbox. Like `recv()`, the user passes in a boolean 'isall' argument to indicate which behavior is desirable." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### The PHOLD Example" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The following example shows a simple use of mailbox. We use the PHOLD model, which has been commonly used for benchmarking parallel and distributed simulators. Here we create four processes, called `generate()`, each representing a node. We also create the same number of mailboxes, one for each process. The `generate()` process tries to receive a job (a message from the mailbox), one at a time, by calling the `recv()` method with 'isall' set to be False. The process then forwards the job to a randomly selected node by calling `send()` with a random delay. To start the simulation, we initially send 5 jobs to the randomly selected nodes. " ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "init sent job[0] to node 3 with delay 2.31933\n", "init sent job[1] to node 2 with delay 2.93198\n", "init sent job[2] to node 1 with delay 1.31505\n", "init sent job[3] to node 3 with delay 1.17636\n", "init sent job[4] to node 0 with delay 3.07286\n", "1.17636: node 3 received job[3], sent to node 2 with delay 1.82572\n", "1.31505: node 1 received job[2], sent to node 1 with delay 1.94616\n", "2.31933: node 3 received job[0], sent to node 1 with delay 1.43835\n", "2.93198: node 2 received job[1], sent to node 0 with delay 1.75724\n", "3.00208: node 2 received job[3], sent to node 3 with delay 1.8796\n", "3.07286: node 0 received job[4], sent to node 1 with delay 1.16034\n", "3.2612: node 1 received job[2], sent to node 0 with delay 1.21069\n", "3.75768: node 1 received job[0], sent to node 2 with delay 1.38867\n", "4.2332: node 1 received job[4], sent to node 3 with delay 1.41479\n", "4.47189: node 0 received job[2], sent to node 3 with delay 1.00346\n", "4.68922: node 0 received job[1], sent to node 1 with delay 2.47331\n", "4.88168: node 3 received job[3], sent to node 1 with delay 1.33768\n" ] } ], "source": [ "# %load '../examples/basics/phold.py'\n", "import simulus\n", "\n", "from random import seed, expovariate, randrange\n", "seed(12345)\n", "\n", "job_count = 5\n", "node_count = 4\n", "lookahead = 1\n", "\n", "def generate(idx):\n", " while True:\n", " msg = mb[idx].recv(isall=False)\n", " print(\"%g: node %d received job[%d],\" % (sim.now, idx, msg), end=' ')\n", " target = randrange(node_count)\n", " delay = expovariate(1)+lookahead\n", " mb[target].send(msg, delay)\n", " print(\"sent to node %d with delay %g\" % (target, delay))\n", "\n", "sim = simulus.simulator()\n", "\n", "mb = [sim.mailbox() for _ in range(node_count)]\n", "for idx in range(node_count):\n", " sim.process(generate, idx)\n", "\n", "# disperse the initial jobs\n", "for idx in range(job_count):\n", " target = randrange(node_count)\n", " delay = expovariate(1)+lookahead\n", " mb[target].send(idx, delay)\n", " print(\"init sent job[%d] to node %d with delay %g\" %\n", " (idx, target, delay))\n", "\n", "sim.run(5)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Peek versus Retrieve" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "As mentioned earlier, the `recv()` method can either retrieve all messages from the mailbox partition, or retrieve only the first one stored in the mailbox partition. In the callback function, the user can also just peek at the mailbox partition (without removing the messages), or retrieve one or all messages.\n", "\n", "In the following example, we show a use case of these options. " ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "0.538916: sent: 0 \n", "0.538916: peek() found: 0 \n", "1: get_one() retrieved: 0\n", "2.25469: sent: 1 2 3 \n", "2.25469: peek() found: 1 \n", "2.25469: get_one() retrieved: 1\n", "2.25469: peek() found: 2 \n", "2.25469: peek() found: 2 3 \n", "3.25469: get_one() retrieved: 2\n", "4.18666: sent: 4 5 \n", "4.18666: peek() found: 3 4 \n", "4.18666: peek() found: 3 4 5 \n", "4.25469: get_one() retrieved: 3\n", "4.50171: sent: 6 7 8 9 \n", "4.50171: peek() found: 4 5 6 \n", "4.50171: peek() found: 4 5 6 9 \n", "4.50171: peek() found: 4 5 6 9 7 \n", "4.50171: peek() found: 4 5 6 9 7 8 \n", "4.67807: sent: 10 \n", "4.67807: peek() found: 4 5 6 9 7 8 10 \n", "5: get_all() retrieved: 4 5 6 9 7 8 10 \n", "6.75093: sent: 11 12 13 \n", "6.75093: peek() found: 11 \n", "6.75093: get_one() retrieved: 11\n", "6.75093: peek() found: 12 \n", "6.75093: peek() found: 12 13 \n", "7.57665: sent: 14 15 \n", "7.57665: peek() found: 12 13 14 \n", "7.57665: peek() found: 12 13 14 15 \n", "7.75093: get_one() retrieved: 12\n" ] } ], "source": [ "# %load '../examples/basics/delivery.py'\n", "import simulus\n", "\n", "from random import seed, expovariate, randint\n", "seed(12345)\n", "\n", "def generate():\n", " num = 0\n", " while True:\n", " sim.sleep(expovariate(1))\n", " print(\"%g: sent:\" % sim.now, end=' ')\n", " for _ in range(randint(1,5)): # send a bunch\n", " print(\"%d\" % num, end=' ')\n", " mb.send(num)\n", " num += 1\n", " print('')\n", "\n", "def peek():\n", " msgs = mb.peek() # just peek at the mailbox\n", " if len(msgs) > 0:\n", " print(\"%g: peek() found:\" % sim.now, end=' ')\n", " for m in msgs:\n", " print(\"%d\" % m, end=' ')\n", " print('')\n", " else:\n", " print(\"%g: peek() found nothing\" % sim.now)\n", "\n", "def get_one():\n", " while True:\n", " sim.sleep(1)\n", " msg = mb.recv(isall=False)\n", " if msg is not None:\n", " print(\"%g: get_one() retrieved: %d\" % (sim.now, msg))\n", " else:\n", " print(\"%g: get_one() retrieved nothing\" % sim.now)\n", "\n", "def get_all():\n", " while True:\n", " sim.sleep(5)\n", " msgs = mb.recv()\n", " if len(msgs) > 0:\n", " print(\"%g: get_all() retrieved:\" % sim.now, end=' ')\n", " for m in msgs:\n", " print(\"%d\" % m, end = ' ')\n", " print('')\n", " else:\n", " print(\"%g: get_all() retrieved nothing\" % sim.now)\n", "\n", "sim = simulus.simulator()\n", "mb = sim.mailbox()\n", "mb.add_callback(peek)\n", "sim.process(generate)\n", "sim.process(get_one)\n", "sim.process(get_all)\n", "sim.run(8)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In this example, we create one mailbox (with a single partition). We attach a callback function `peek()` to the mailbox (partition zero). Every time the mailbox receives a message, the `peek()` function will be invoked and the function literally just takes a peek at the mailbox and lists all the messages therein.\n", "\n", "We create three processes. The `generate()` process sleeps for some random time and then sends a bunch of messages (ranging between 1 and 5) to the mailbox. And then it sleeps again and repeats the send. \n", "\n", "The `get_one()` process sleeps for 1 second (from the previous receive) and tries to retrieve one message from the mailbox. It may be blocked if the mailbox is empty. As we can see from the example, the number of messages in the mailbox seems to increase over time, since the `generate()` may send more messages than those consumed by the `get_one()` process. \n", "\n", "To avoid indefinite increase of the buffered messages, the `get_all()` process sleeps for 5 seconds and then tries to retrieve all messages from the mailbox in one call to `recv()` method. From the print-out, we can examine whether the behavior of the peeks and retrieves is as expected." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Conditional Wait on Mailboxes" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Similar to store, the mailbox facility is not a trappable. That is, one cannot use the mailbox directly for conditional wait. We have the same reasons. a mailbox can have multiple partitions, the `recv()` method can be blocked on receiving messages from any of the partitions. The other reason is that the `recv()` method is expected to retrieve and return one or all of the messages depending on the 'isall' argument. To allow conditional wait, mailbox provides the `receiver()` method, which returns the corresponding trappable for receiving messages from a designated mailbox partition. \n", "\n", "In the following example, we model the routine of a mailman and his patron. The overworked mailman is a process, who starts the day at 8 o'clock and sorts all mails until 2 o'clock in the afternoon. Then he starts to deliver the mails, which may take between one and five hours. We model this by sending a message to his patron's mailbox with a random delay. The patron is another process, who comes back from work at 5 o'clock every day. He does a timed wait on the mailbox, representing his checking the mailbox until 6 o'clock. If the mail arrives before 6, the patron receives it. Otherwise, the patron gives up and he'll receive it by next day. " ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "--- day 0 ---\n", "14:00 mail truck's out, expected delivery at 16:53\n", "17:00 receives 'letter for day 0'\n", "--- day 1 ---\n", "14:00 mail truck's out, expected delivery at 18:20\n", "18:00 mail truck didn't come today\n", "--- day 2 ---\n", "14:00 mail truck's out, expected delivery at 15:02\n", "17:00 receives 'letter for day 1'\n", "17:00 receives 'letter for day 2'\n", "--- day 3 ---\n", "14:00 mail truck's out, expected delivery at 18:43\n", "18:00 mail truck didn't come today\n", "--- day 4 ---\n", "14:00 mail truck's out, expected delivery at 18:45\n", "17:00 receives 'letter for day 3'\n" ] } ], "source": [ "# %load '../examples/basics/mailman.py'\n", "from time import gmtime, strftime\n", "from random import seed, randint\n", "import simulus\n", "\n", "def strnow(t=None):\n", " if not t: t = sim.now\n", " return strftime(\"%H:%M\", gmtime(t))\n", "\n", "def mailman():\n", " day = 0\n", " while True:\n", " sim.sleep(until=day*24*3600+8*3600) # 8 o'clock\n", " print('--- day %d ---' % day)\n", " \n", " # sort the mails in the moring and get out for delivery at 2\n", " # o'clock in the afternoon\n", " sim.sleep(until=day*24*3600+14*3600)\n", "\n", " # it may take variable amount of time (between 1 to 5 hours)\n", " # before the mails can be delivered to people's mailboxes\n", " delay = randint(3600, 5*3600)\n", " mb.send('letter for day %d' % day, delay)\n", " print(\"%s mail truck's out, expected delivery at %s\" %\n", " (strnow(), strnow(sim.now+delay)))\n", "\n", " # go to the next day\n", " day += 1\n", "\n", "def patron():\n", " day = 0\n", " while True:\n", " # come back from work at 5 PM\n", " sim.sleep(until=day*24*3600+17*3600)\n", " \n", " # check the mailbox within an hour (until 6 PM)\n", " rcv = mb.receiver()\n", " _, timedout = sim.wait(rcv, 3600)\n", " if timedout:\n", " print(\"%s mail truck didn't come today\" % strnow())\n", " else:\n", " for ltr in rcv.retval:\n", " print(\"%s receives '%s'\" % (strnow(), ltr))\n", "\n", " # go to the next day\n", " day += 1\n", " \n", "seed(12345)\n", "\n", "sim = simulus.simulator()\n", "mb = sim.mailbox()\n", "\n", "sim.process(mailman)\n", "sim.process(patron)\n", "\n", "sim.run(5*24*3600)\n" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.3" } }, "nbformat": 4, "nbformat_minor": 2 }