{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "## Inter-Process Communication" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Processes often need to interact with one another in order to accomplish tasks. In the previous homework example, we saw one simplest way to communicate among the processes: the `homework` process waits for the completion of all `student` processes. Simulus provides a rich set of mechanisms to support inter-process communication. We start by discussing the two basic primitive methods designed specifically for synchronizing and communicating among the simulation processes: one is called a \"trap\" and the other is called a \"semaphore\"." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Traps" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Traps are one-time signaling mechanisms for inter-process communication. A trap has three states. It's \"unset\" when the trap is first created and and nothing has happened to it. It's \"set\" when one or more processes are waiting for the trap to be triggered. It turns to \"sprung\" when the trap has been triggered, after which there will be no more processes waiting for the trap.\n", "\n", "The life of a trap is as follows. A trap starts with the \"unset\" state when it's created. When a process waits for a trap, the trap goes to \"set\", at which state more processes may come and wait on the same trap, and the trap would remain in the same \"set\" state. When a process triggers the trap and if the trap is in the \"set\" state, *all* processes waiting on the trap will be unblocked and resume execution (it's guaranteed there is at least one waiting process when the trap is in the \"set\" state). The trap will then be transitioned into the \"sprung\" state. When a process triggers the trap which is in the \"unset\" state, the trap will just be transitioned to the \"sprung\" state (since there are no processes currently waiting on the trap). If a trap has \"sprung\", further waiting on the trap will be considered as an no-op; that is, the processes trying to wait on a \"sprung\" trap will have not effect; the process will not be suspended. " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### One-Time Signaling" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "A trap is a very simple signaling mechanism. One or more processes can wait on a trap. When another process triggers the trap, all the waiting processes will be released at once. It is important to know that a trap is for **one-time use** only. Once sprung, a trap cannot be triggered any more, or simulus will raise an exception. That is, if a process wants to send multiple signals to other processes, one would have to use multiple traps (or some other synchronization mechanisms as we will discuss later).\n", "\n", "The following example shows a simple use case for traps." ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "p0 starts at 10\n", "p1 starts at 11 and waits on trap\n", "p2 starts at 12 and waits on trap\n", "p3 starts at 13 and waits on trap\n", "p4 starts at 14 and waits on trap\n", "p5 starts at 15 and waits on trap\n", "p0 triggers the trap at 15\n", "p1 resumes at 15\n", "p2 resumes at 15\n", "p3 resumes at 15\n", "p4 resumes at 15\n", "p5 resumes at 15\n", "p6 starts at 16 and waits on trap\n", "p6 resumes at 16\n", "p7 starts at 17 and waits on trap\n", "p7 resumes at 17\n", "p8 starts at 18 and waits on trap\n", "p8 resumes at 18\n", "p9 starts at 19 and waits on trap\n", "p9 resumes at 19\n" ] } ], "source": [ "# %load \"../examples/basics/onetrap.py\"\n", "import simulus\n", "\n", "def p(idx):\n", " if idx > 0:\n", " print(\"p%d starts at %g and waits on trap\" % (idx, sim.now))\n", " t.wait()\n", " print(\"p%d resumes at %g\" % (idx, sim.now))\n", " else:\n", " print(\"p%d starts at %g\" % (idx, sim.now))\n", " sim.sleep(5)\n", " print(\"p%d triggers the trap at %g\" % (idx, sim.now))\n", " t.trigger()\n", "\n", "sim = simulus.simulator()\n", "t = sim.trap()\n", "for i in range(10):\n", " sim.process(p, i, offset=10+i)\n", "sim.run()\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In this example, we create a trap using the simulator's `trap()` method. We create 10 processes, p0, p1, ... p9. We stagger them to start from time 10 to 19. Process p0 acts differently from the others: it sleeps for 5 and then triggers the trap. All the other processes, p1 to p9, simply wait on the trap. You can inspect the results from running this example to see whether the processes behave as what you would expect." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Barriers" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "A barrier can be used to synchronize a group of processes. A barrier means that the processes must stop at the barrier and cannot be allowed to proceed until all processes from the group reach the barrier. When the last process reaches the barrier, all processes can resume execution and continue from the barrier.\n", "\n", "It is rather straightforward to implement barriers using traps. In the following example, we creates such a barrier." ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "p0 runs at 0 and sleeps for 0.538916\n", "p2 runs at 0 and sleeps for 0.0102212\n", "p6 runs at 0 and sleeps for 1.74415\n", "p9 runs at 0 and sleeps for 0.354734\n", "p5 runs at 0 and sleeps for 0.459518\n", "p1 runs at 0 and sleeps for 0.215251\n", "p4 runs at 0 and sleeps for 0.83473\n", "p3 runs at 0 and sleeps for 0.176365\n", "p8 runs at 0 and sleeps for 0.132694\n", "p7 runs at 0 and sleeps for 0.567284\n", "p2 reaches barrier at 0.0102212\n", "p8 reaches barrier at 0.132694\n", "p3 reaches barrier at 0.176365\n", "p1 reaches barrier at 0.215251\n", "p9 reaches barrier at 0.354734\n", "p5 reaches barrier at 0.459518\n", "p0 reaches barrier at 0.538916\n", "p7 reaches barrier at 0.567284\n", "p4 reaches barrier at 0.83473\n", "p6 reaches barrier at 1.74415\n", "p6 runs at 1.74415 and sleeps for 0.825716\n", "p2 runs at 1.74415 and sleeps for 0.191577\n", "p8 runs at 1.74415 and sleeps for 0.805691\n", "p3 runs at 1.74415 and sleeps for 0.438352\n", "p1 runs at 1.74415 and sleeps for 3.17163\n", "p9 runs at 1.74415 and sleeps for 0.0957338\n", "p5 runs at 1.74415 and sleeps for 3.84624\n", "p0 runs at 1.74415 and sleeps for 0.531231\n", "p7 runs at 1.74415 and sleeps for 0.701049\n", "p4 runs at 1.74415 and sleeps for 0.16034\n", "p9 reaches barrier at 1.83988\n", "p4 reaches barrier at 1.90449\n", "p2 reaches barrier at 1.93573\n", "p3 reaches barrier at 2.1825\n", "p0 reaches barrier at 2.27538\n", "p7 reaches barrier at 2.4452\n", "p8 reaches barrier at 2.54984\n", "p6 reaches barrier at 2.56987\n", "p1 reaches barrier at 4.91578\n", "p5 reaches barrier at 5.59039\n", "p5 runs at 5.59039 and sleeps for 1.26928\n", "p9 runs at 5.59039 and sleeps for 0.210686\n", "p4 runs at 5.59039 and sleeps for 0.417883\n", "p2 runs at 5.59039 and sleeps for 0.0238023\n", "p3 runs at 5.59039 and sleeps for 0.414785\n", "p0 runs at 5.59039 and sleeps for 3.42598\n", "p7 runs at 5.59039 and sleeps for 3.85368\n", "p8 runs at 5.59039 and sleeps for 1.36465\n", "p6 runs at 5.59039 and sleeps for 0.00346059\n", "p1 runs at 5.59039 and sleeps for 2.81739\n", "p6 reaches barrier at 5.59385\n", "p2 reaches barrier at 5.61419\n", "p9 reaches barrier at 5.80107\n", "p3 reaches barrier at 6.00517\n", "p4 reaches barrier at 6.00827\n", "p5 reaches barrier at 6.85967\n", "p8 reaches barrier at 6.95504\n", "p1 reaches barrier at 8.40778\n", "p0 reaches barrier at 9.01636\n", "p7 reaches barrier at 9.44407\n", "p7 runs at 9.44407 and sleeps for 2.04614\n", "p6 runs at 9.44407 and sleeps for 1.47331\n", "p2 runs at 9.44407 and sleeps for 0.197078\n", "p9 runs at 9.44407 and sleeps for 0.104805\n", "p3 runs at 9.44407 and sleeps for 0.535345\n", "p4 runs at 9.44407 and sleeps for 2.1675\n", "p5 runs at 9.44407 and sleeps for 0.862954\n", "p8 runs at 9.44407 and sleeps for 1.33401\n", "p1 runs at 9.44407 and sleeps for 0.264775\n", "p0 runs at 9.44407 and sleeps for 0.741492\n", "p9 reaches barrier at 9.54887\n", "p2 reaches barrier at 9.64114\n", "p1 reaches barrier at 9.70884\n", "p3 reaches barrier at 9.97941\n" ] } ], "source": [ "# %load \"../examples/basics/barrier.py\"\n", "import simulus\n", "\n", "from random import seed, expovariate\n", "seed(12345)\n", "\n", "class Barrier(object):\n", " def __init__(self, sim, total_procs):\n", " self.sim = sim\n", " self.total_procs = total_procs\n", " # reset the barrier (by creating a new trap and \n", " # resetting the count)\n", " self.trap = self.sim.trap()\n", " self.num = 0\n", " \n", " def barrier(self):\n", " self.num += 1\n", " if self.num < self.total_procs:\n", " # not yet all processes have reached the barrier\n", " self.trap.wait()\n", " else:\n", " # the last process has reached the barrier\n", " self.trap.trigger()\n", " # reset the barrier for next time use (by \n", " # creating a new trap and resetting the count)\n", " self.trap = self.sim.trap()\n", " self.num = 0\n", "\n", "def p(idx):\n", " while True:\n", " t = expovariate(1)\n", " print(\"p%d runs at %g and sleeps for %g\" % (idx, sim.now, t))\n", " sim.sleep(t)\n", " print(\"p%d reaches barrier at %g\" % (idx, sim.now))\n", " bar.barrier()\n", " \n", "sim = simulus.simulator()\n", "bar = Barrier(sim, 10)\n", "for i in range(10):\n", " sim.process(p, i)\n", "sim.run(10)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The `Barrier` class implements the barrier. One creates a barrier with two arguments: the simulator on which the processes are run and the total number of processes expected at the barrier. Internally, we create a trap for the synchronization and use a counter (called `num`) to keep track of the number of processes having reached the barrier so far. The counter is initialized to zero. Each time a process wants to use the barrier, it calls the `barrier()` method, which increments the counter. If the counter is smaller than the total number of processes, the process will be put on hold (using the trap's `wait()` method). Otherwise, if the counter gets to the total number of processes, the process is the last one among the group of processes to reach the barrier. Therefore, it calls the trap's `trigger()` method to release all the waiting processes that have earlier arrived at the barrier. Remember that traps can only be used once. To make the barrier reusable, we create a new trap each time the last process reaches the barrier. We also reset the counter.\n", "\n", "In this example, we create 10 processes. Each process waits for some random time (exponentially distributed) and then calls `barrier()`. Once the method returns, the process repeats the same: it waits for some random time and calls for the next barrier." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Semaphores" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Semaphores are multi-use signaling mechanisms for inter-process communication. It is the other primitive method beside traps designed for synchronizing and communicating simulation processes. \n", "\n", "In simulus, a semaphore implements what is commonly called a \"counting semaphore.\" Initially, a semaphore can have a nonnegative integer count, which indicates the number of available resources. The processes atomically increment the semaphore count to represent resources being added or returned to the pool (using the `signal()` method). Similarly, the processes atomically decrement the semaphore count to represent resources being removed from the pool (using the `wait()` method). When the semaphore count is zero, it means that there are no available resources. In that case, a process trying to decrement the semaphore (to remove a resource) will be blocked until more resources are added back to the pool. \n", "\n", "A semaphore is different than a trap. A trap is a one-time signaling mechanism. Multiple processes can wait on a trap. Once a process triggers the trap, *all* waiting processes will be unblocked. Moreover, a trap cannot be reused: once a trap is sprung, subsequent waits will not block the processes and a trap cannot be triggered again. In comparison, a semaphore is a multi-use signaling mechanism. Each time a process waits on a semaphore, the semaphore value will be decremented. If the value becomes negative, the process will be blocked. Each time one signals a semaphore, the semaphore value will be incremented. If there are blocked processes, *one* of these processes will be unblocked. Processes can use the same semaphore continuously multiple times." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Circular Wait" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The following example shows the use of semaphores to synchronize a group of processes. " ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "p0 wakes up at 0\n", "p1 wakes up at 0.538916\n", "p2 wakes up at 0.549138\n", "p3 wakes up at 2.29329\n", "p4 wakes up at 2.64802\n", "p5 wakes up at 3.10754\n", "p6 wakes up at 3.32279\n", "p7 wakes up at 4.15752\n", "p8 wakes up at 4.33388\n", "p9 wakes up at 4.46658\n", "p0 wakes up at 5.03386\n", "p1 wakes up at 5.85958\n", "p2 wakes up at 6.05115\n", "p3 wakes up at 6.85685\n", "p4 wakes up at 7.2952\n", "p5 wakes up at 10.4668\n", "p6 wakes up at 10.5626\n", "p7 wakes up at 14.4088\n", "p8 wakes up at 14.94\n", "p9 wakes up at 15.6411\n", "p0 wakes up at 15.8014\n", "p1 wakes up at 17.0707\n", "p2 wakes up at 17.2814\n", "p3 wakes up at 17.6993\n", "p4 wakes up at 17.7231\n", "p5 wakes up at 18.1379\n" ] } ], "source": [ "# %load \"../examples/basics/circular.py\"\n", "import simulus\n", "\n", "from random import seed, expovariate\n", "seed(12345)\n", "\n", "def p(idx):\n", " while True:\n", " sems[idx].wait()\n", " print(\"p%d wakes up at %g\" % (idx, sim.now))\n", " sim.sleep(expovariate(1))\n", " sems[(idx+1)%total_procs].signal()\n", " \n", "sim = simulus.simulator()\n", "\n", "total_procs = 10\n", "sems = [sim.semaphore() for _ in range(total_procs)]\n", "for i in range(10):\n", " sim.process(p, i)\n", "sems[0].signal()\n", "sim.run(20)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In this example, ten processes are organized in a circle. Each process is created with a semaphore; it waits for the semaphore and then signal the semaphore of the subsequent process, and then repeats. In this case, all the processes are executed one at a time in a round robin fashion.\n", "\n", "The use of semaphores in this case is very much like traps, since there is at most one process waiting on a semaphore at any given time. Whether the semaphore unblocks just one waiting process at a time, or the trap unblocks all the waiting processes at once does not really matter in this case. However, a semaphore can be reused, while a trap cannot. If we use traps for this example, we would have to create a new trap each time a process wakes up (similar to what we did for the barrier example)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Producer-Consumer Problem" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We should have learned from our Operation Systems class about the producer-consumer problem (also known as the bounded-buffer problem). It's a classic scenario for multi-process synchronization. In its simplest form, the problem consists of two processes. A producer process repeatedly generates data and put them into a common, fixed sized buffer. A consumer consumes the data by removing them from the buffer one at a time. The problem is that the producer cannot put data into the buffer if the buffer is already full. Similarly, the consumer cannot remove data from the buffer if the buffer is empty.\n", "\n", "One solution to the producer-consumer problem is to use semaphores, as shown in the following example. Later we will show an even simpler way to solve this problem (using `store` provided by simulus)." ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "0.010221: p[1] produces item [0]\n", "0.010221: p[1] stores item [0] in buffer\n", "0.010221: c[0] retrieves item [0] from buffer\n", "0.538916: p[0] produces item [1]\n", "0.538916: p[0] stores item [1] in buffer\n", "0.538916: c[2] retrieves item [1] from buffer\n", "0.752533: c[0] consumes item[0]\n", "0.754168: p[0] produces item [2]\n", "0.754168: p[0] stores item [2] in buffer\n", "0.754168: c[1] retrieves item [2] from buffer\n", "1.521765: c[2] consumes item[1]\n", "1.588897: p[0] produces item [3]\n", "1.588897: p[0] stores item [3] in buffer\n", "1.588897: c[0] retrieves item [3] from buffer\n", "1.608449: c[1] consumes item[2]\n", "1.754371: p[1] produces item [4]\n", "1.754371: p[1] stores item [4] in buffer\n", "1.754371: c[2] retrieves item [4] from buffer\n", "2.156181: p[0] produces item [5]\n", "2.156181: p[0] stores item [5] in buffer\n", "2.156181: c[1] retrieves item [5] from buffer\n", "2.476470: c[0] consumes item[3]\n", "2.580087: p[1] produces item [6]\n", "2.580087: p[1] stores item [6] in buffer\n", "2.580087: c[0] retrieves item [6] from buffer\n", "2.594533: p[0] produces item [7]\n", "2.594533: p[0] stores item [7] in buffer\n", "2.670563: c[2] consumes item[4]\n", "2.670563: c[2] retrieves item [7] from buffer\n", "3.125764: p[0] produces item [8]\n", "3.125764: p[0] stores item [8] in buffer\n", "3.181913: c[1] consumes item[5]\n", "3.181913: c[1] retrieves item [8] from buffer\n", "3.771587: c[2] consumes item[7]\n", "3.826813: p[0] produces item [9]\n", "3.826813: p[0] stores item [9] in buffer\n", "3.826813: c[2] retrieves item [9] from buffer\n", "3.846008: c[0] consumes item[6]\n", "4.037499: p[0] produces item [10]\n", "4.037499: p[0] stores item [10] in buffer\n", "4.037499: c[0] retrieves item [10] from buffer\n", "4.172205: c[1] consumes item[8]\n", "4.455382: p[0] produces item [11]\n", "4.455382: p[0] stores item [11] in buffer\n", "4.455382: c[1] retrieves item [11] from buffer\n", "4.882414: c[2] consumes item[9]\n", "5.017675: c[0] consumes item[10]\n", "5.282205: c[1] consumes item[11]\n", "5.751716: p[1] produces item [12]\n", "5.751716: p[1] stores item [12] in buffer\n", "5.751716: c[2] retrieves item [12] from buffer\n", "6.551144: c[2] consumes item[12]\n", "7.881357: p[0] produces item [13]\n", "7.881357: p[0] stores item [13] in buffer\n", "7.881357: c[0] retrieves item [13] from buffer\n", "8.664728: c[0] consumes item[13]\n", "9.605397: p[1] produces item [14]\n", "9.605397: p[1] stores item [14] in buffer\n", "9.605397: c[1] retrieves item [14] from buffer\n" ] } ], "source": [ "# %load \"../examples/basics/boundbuf.py\"\n", "import simulus\n", "\n", "from random import seed, expovariate, gauss\n", "seed(12345)\n", "\n", "bufsiz = 5 # buffer capacity\n", "items_produced = 0 # keep track the number of items produced\n", "items_consumed = 0 # ... and consumed for ALL producers and consumers\n", "num_producers = 2 # number of producers\n", "num_consumers = 3 # number of consumers\n", "\n", "def producer(idx):\n", " global items_produced\n", " while True:\n", " sim.sleep(expovariate(1)) # take time to produce an item\n", " num = items_produced\n", " items_produced += 1\n", " print(\"%f: p[%d] produces item [%d]\" % (sim.now, idx, num))\n", " sem_avail.wait() # require a free slot in buffer\n", " sem_occupy.signal() # store the item and increase occupancy\n", " print(\"%f: p[%d] stores item [%d] in buffer\" % \n", " (sim.now, idx, num))\n", " \n", "def consumer(idx):\n", " global items_consumed\n", " while True:\n", " sem_occupy.wait() # require an item from buffer\n", " sem_avail.signal() # retrieve the item and bump the free slots\n", " num = items_consumed\n", " items_consumed += 1\n", " print(\"%f: c[%d] retrieves item [%d] from buffer\" %\n", " (sim.now, idx, num))\n", " sim.sleep(gauss(0.8, 0.2)) # take time to consume the item\n", " print(\"%f: c[%d] consumes item[%d]\" % (sim.now, idx, num)) \n", "\n", "sim = simulus.simulator()\n", "sem_avail = sim.semaphore(bufsiz) # available slots\n", "sem_occupy = sim.semaphore(0) # no items yet\n", "for i in range(num_producers): \n", " sim.process(producer, i)\n", "for i in range(num_consumers):\n", " sim.process(consumer, i)\n", "sim.run(10)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We use two semaphores: one semaphore `sem_avail` is used to count the number of free slots in the buffer, and the other semaphore `sem_occupy` is used to count the number of produced items stored in the buffer. \n", "\n", "A producer sleeps for some random time which is exponentially distributed to represent the production of an item, then calls `wait()` on the semaphore `sem_avail` to decrement the available slots in the buffer. The process may be blocked if there is no more free slots available, in which case we wait for a consumer to retrieve an item and thereby creates a free slot. If the buffer is not full, the producer adds the item into the buffer, which is represented by calling `signal()` on the semaphore `sem_occupy`, which increments the number of occupied items in the buffer (which could potential unblock a waiting consumer process). The producer then repeats.\n", "\n", "A consumer calls `wait()` on the semaphore `sem_occupy` to decrement the number of items stored in the buffer. The process may be blocked if there are currently no items in the buffer. In this case, the process will wait until a producer adds an item. Otherwise, the consumer retrieves the item, which is represented by calling `signal()` on the semaphore `sem_avail`, which increments the number of available slots in the buffer. This could potentially unblock a waiting producer process. The consume process then consumes the item, by randomly waiting for some random time, which is normally distributed. The consumer then repeats.\n", "\n", "In this example, we create two producer and three consumer processes, and use the two semaphores to synchronize the processes to access the bounded buffer. If you're familiar with the performance modeling literature, you may recognize that we are actually simulating a multi-server queue with limited capacity." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Queuing Disciplines" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "If multiple processes are waiting on a semaphore, the order in which the processes are unblocked may be important. By default, a semaphore applies the FIFO order (first in first out). That is, the first process which got blocked on the semaphore will be the first one unblocked. \n", "\n", "Other queuing disciplines are also possible, including LIFO (last in first out), SIRO (service in random order), and PRIORITY (which is based on the 'priority' of the processes: a lower value means higher priority). One can choose a queuing discipline when the semaphore is created. The queuing disciplines are constants defined in the `QDIS` class.\n", "\n", "In the following example, we show the use of different queuing disciplines." ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "----------------------------------------\n", "p[id=0,prio=3.2] resumes at 10.000000\n", "p[id=1,prio=2.2] resumes at 10.000000\n", "p[id=2,prio=1.2] resumes at 10.000000\n", "p[id=3,prio=0.2] resumes at 10.000000\n", "p[id=4,prio=0.8] resumes at 10.000000\n", "p[id=5,prio=1.8] resumes at 10.000000\n", "p[id=6,prio=2.8] resumes at 10.000000\n", "p[id=7,prio=3.8] resumes at 10.000000\n", "p[id=8,prio=4.8] resumes at 10.000000\n", "p[id=9,prio=5.8] resumes at 10.000000\n", "----------------------------------------\n", "p[id=9,prio=5.8] resumes at 110.000000\n", "p[id=8,prio=4.8] resumes at 110.000000\n", "p[id=7,prio=3.8] resumes at 110.000000\n", "p[id=6,prio=2.8] resumes at 110.000000\n", "p[id=5,prio=1.8] resumes at 110.000000\n", "p[id=4,prio=0.8] resumes at 110.000000\n", "p[id=3,prio=0.2] resumes at 110.000000\n", "p[id=2,prio=1.2] resumes at 110.000000\n", "p[id=1,prio=2.2] resumes at 110.000000\n", "p[id=0,prio=3.2] resumes at 110.000000\n", "----------------------------------------\n", "p[id=3,prio=0.2] resumes at 210.000000\n", "p[id=0,prio=3.2] resumes at 210.000000\n", "p[id=5,prio=1.8] resumes at 210.000000\n", "p[id=9,prio=5.8] resumes at 210.000000\n", "p[id=2,prio=1.2] resumes at 210.000000\n", "p[id=8,prio=4.8] resumes at 210.000000\n", "p[id=7,prio=3.8] resumes at 210.000000\n", "p[id=6,prio=2.8] resumes at 210.000000\n", "p[id=4,prio=0.8] resumes at 210.000000\n", "p[id=1,prio=2.2] resumes at 210.000000\n", "----------------------------------------\n", "p[id=3,prio=0.2] resumes at 310.000000\n", "p[id=4,prio=0.8] resumes at 310.000000\n", "p[id=2,prio=1.2] resumes at 310.000000\n", "p[id=5,prio=1.8] resumes at 310.000000\n", "p[id=1,prio=2.2] resumes at 310.000000\n", "p[id=6,prio=2.8] resumes at 310.000000\n", "p[id=0,prio=3.2] resumes at 310.000000\n", "p[id=7,prio=3.8] resumes at 310.000000\n", "p[id=8,prio=4.8] resumes at 310.000000\n", "p[id=9,prio=5.8] resumes at 310.000000\n" ] } ], "source": [ "# %load \"../examples/basics/qdis.py\"\n", "import simulus\n", "\n", "# so that we get same result from random priority\n", "from random import seed\n", "seed(12345)\n", "\n", "def p(idx, sem):\n", " # set the priority of the current process (this is only useful \n", " # if we use PRIORITY qdis)\n", " sim.set_priority(abs(idx-3.2))\n", "\n", " # make sure the process wait on the semaphore in order\n", " sim.sleep(idx)\n", "\n", " # the process will block on the semaphore and then print out \n", " # a message when it is unblocked\n", " sem.wait()\n", " print(\"p[id=%d,prio=%.1f] resumes at %f\" % \n", " (idx, sim.get_priority(), sim.now))\n", "\n", "def trywaits(sem):\n", " # create ten processes which will all block on the semaphore\n", " for i in range(10):\n", " sim.process(p, idx=i, sem=sem)\n", " sim.sleep(10)\n", " \n", " # release them all and check the order they are unblocked\n", " print('-'*40)\n", " for i in range(10):\n", " sem.signal()\n", "\n", "sim = simulus.simulator()\n", "s1 = sim.semaphore()\n", "s2 = sim.semaphore(qdis=simulus.QDIS.LIFO)\n", "s3 = sim.semaphore(qdis=simulus.QDIS.SIRO)\n", "s4 = sim.semaphore(qdis=simulus.QDIS.PRIORITY)\n", "sim.process(trywaits, s1, offset=0)\n", "sim.process(trywaits, s2, offset=100)\n", "sim.process(trywaits, s3, offset=200)\n", "sim.process(trywaits, s4, offset=300)\n", "sim.run()\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We create four semaphores, each selected for a different queuing discipline. This is achieved by passing the `qdis` argument to the simulator's `semaphore()` method that creates the semaphores. To use the priority based queuing discipline, we use the simulator's `set_priority()` method to set the current process's priority. \n", "\n", "For each of the four semaphores, we create a `trywaits` process, which then creates another 10 processes who will wait on the semaphore. Then the `trywaits` process will release all the waiting processes one by one. From the print-out, we should be able to determine the order in which the waiting processes are unblocked. " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Trappables and Conditional Waits" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Both traps and semaphores are called **trappables**. An event (i.e., a scheduled function) is a trappable. A process is also a trappable. Simulus provides a very powerful function called `wait()` to block the calling process until any or all of the given trappables are triggered, or until a pre-specified amount of time has elapsed. When we say a trappable is \"triggered\", we mean that the wait condition on the trappable has been satisfied. If it's a trap, it means the trap has been triggered by another process. If it's a semaphore, it means a wait on the semaphore has returned. If it's an event, it means the event has happened (and the event handler has been invoked). If it's a process, it means the process has terminated. \n", "\n", "In case of a trap, if we have only one trap, say `t`, calling the wait method of the trap, `t.wait()`, is equivalent to calling the simulator's `wait()` function with the trap passed as the argument: `sim.wait(t)`. Similarly, in case of a semaphore, if we have only one semaphore, say `s`, `s.wait()` is equivalent to `sim.wait(s)`. \n", "\n", "The simulator's `wait()` function expects the argument to be either one trappable or a list of trappables. If it takes more than one trappables in a list or tuple, the calling process will be blocked until either *one* of the trappables, or *all* of the trappables are triggered. The behavior depends on the `method` argument: if it's `any`, the wait condition is satisfied as soon as one of the trappables is triggered; if it's 'all', the process needs to wait until all trappables are triggered. If the method argument is not provided, simulus assumes it's 'all' by default." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Waiting on Multiple Trappables" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The following shows an example of a process waiting for multiple trappables." ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "p1 triggers t1 at 10\n", "p1 triggers s1 at 20\n", "p2 resumes at 20 (ret=[True, True])\n", "p1 triggers t2 at 30\n", "p2 resumes at 30 (ret=[True, False])\n", "p1 triggers s2 at 40\n", "p2 resumes at 40 (ret=[True])\n" ] } ], "source": [ "# %load \"../examples/basics/multiwait.py\"\n", "import simulus\n", "\n", "def p1():\n", " sim.sleep(10)\n", " print(\"p1 triggers t1 at %g\" % sim.now)\n", " t1.trigger()\n", "\n", " sim.sleep(10)\n", " print(\"p1 triggers s1 at %g\" % sim.now)\n", " s1.trigger() # signal and trigger are aliases for semaphore\n", "\n", " sim.sleep(10)\n", " print(\"p1 triggers t2 at %g\" % sim.now)\n", " t2.trigger()\n", "\n", " sim.sleep(10)\n", " print(\"p1 triggers s2 at %g\" % sim.now)\n", " s2.signal()\n", "\n", "def p2():\n", " tp = (t1, s1)\n", " r, _ = sim.wait(tp)\n", " print(\"p2 resumes at %g (ret=%r)\" % (sim.now, r))\n", "\n", " tp = [t2, s2]\n", " r, _ = sim.wait(tp, method=any)\n", " print(\"p2 resumes at %g (ret=%r)\" % (sim.now, r))\n", "\n", " # find the remaining untriggered trappables (using the \n", " # returned mask) and wait for them all\n", " tp = [t for i, t in enumerate(tp) if not r[i]]\n", " r, _ = sim.wait(tp)\n", " print(\"p2 resumes at %g (ret=%r)\" % (sim.now, r))\n", "\n", "sim = simulus.simulator()\n", "t1 = sim.trap()\n", "t2 = sim.trap()\n", "s1 = sim.semaphore()\n", "s2 = sim.semaphore()\n", "sim.process(p1)\n", "sim.process(p2)\n", "sim.run()\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In this example, we create two traps, t1 and t2, and two semaphores, s1 and s2. We also create two processes. The process p1 triggers a trap or a semaphore at 10 seconds interval. The other process p2 first waits on a trap and a semaphore using the default method ('all'). As expected, the process only resumes execution once both trappables have been triggered. The p2 process then waits on the other trap and semaphore using the `any` method. In this case, when one trappable is triggered, the process will resume execution. It then filters out the triggered trappables from the list and creates another list with the remaining untriggered trappables (actually there's only one left), and then wait until all of the remaining trapples are triggered.\n", "\n", "The return from the `wait()` function needs a bit more explanation. The `wait()` function actually returns a tuple with two elements: the first element of the tuple indicates whether the trappables have been triggered or not; and the second element of tuple indicates whether timeout happens. Since in this example, we don't use timed wait, the `wait()` function always returns False in the second element.\n", "\n", "If the first argument when calling the `wait()` function is but one trappable (not in a list or tuple), the first element of the returned tuple will simply be a boolean (True or False), indicating whether the trappable has been triggered or not upon the return of the function. If, on the other hand, the first argument when calling the `wait()` functions a list or a tuple of trappables (even if with just one trappable), the first element of the returned tuple will be a list of booleans, where each element of the list indicates whether the corresponding trappable has been triggered.\n", "\n", "In the example, when the `wait()` function is called the first time, it returns a list with True and True for the first element of the returned tuple, since both trappables must be triggered before the process can resume execution (because of the 'all' method). In the second time, the function returns True and False. Only the first trappable (the trap t2) is triggered at the time. Because of the method is 'any', one triggered trappable is good enough to satisfy the wait condition. at the third time, the remaining trappable (semaphore s2) is triggered and the function returns a list consisted of only one True element.\n", "\n", "We can optionally provide an 'offset' or an 'until' argument to the `wait()` function. The 'offset' is the relative time from now until which the process will be put on hold at the latest; if provided, it must be a non-negative value. The 'until' is the absolute time at which the process is expected to resume execution at the latest; if provided, it must not be earlier than the current time. Either 'offset' or 'until' can be provided, but not both. If both 'offset' and 'until' are ignored (like what we have previously), there will be no time limit on the conditional wait." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### A Race between Tom and Jerry" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "To be able to perform conditional wait on multiple trappables certainly allows a process-oriented model to be quite expressive. The following shows a good example. Tom and Jerry decides to enter a race. Tom is modeled by processes. Each time Tom enters the race, we create a process, which calls `sleep()` to represent the time duration for the run. The time duration is a random variable from a normal distribution with a mean of 100 and a standard deviation of 50 (and a cutoff below zero). Jerry is modeled by events. Each time Jerry enters the race, we schedule an event using `sched()` with a time offset representing the time duration for the run. The time duration is a random variable from a uniform distribution between 50 and 100. Tom and Jerry compete for ten times; the next race would start as soon as the previous one finishes. For each race, whoever runs the fastest wins. But if they run for more than 100, both are disqualified for that race. The simulation finds out who eventually wins more races." ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "<-- competition starts at 0 -->\n", "77.5459: jerry finished\n", "77.5459: jerry wins\n", "<-- competition starts at 77.5459 -->\n", "171.749: jerry finished\n", "171.749: jerry wins\n", "<-- competition starts at 171.749 -->\n", "271.749: both disqualified\n", "<-- competition starts at 271.749 -->\n", "357.072: tom finished\n", "357.072: tom wins\n", "<-- competition starts at 357.072 -->\n", "430.387: tom finished\n", "430.387: tom wins\n", "<-- competition starts at 430.387 -->\n", "485.297: tom finished\n", "485.297: tom wins\n", "<-- competition starts at 485.297 -->\n", "585.297: both disqualified\n", "<-- competition starts at 585.297 -->\n", "611.838: tom finished\n", "611.838: tom wins\n", "<-- competition starts at 611.838 -->\n", "711.838: both disqualified\n", "<-- competition starts at 711.838 -->\n", "811.838: both disqualified\n", "final result: tom:jerry=4:2\n" ] } ], "source": [ "# %load \"../examples/basics/tomjerry.py\"\n", "import simulus\n", "\n", "from random import seed, gauss, uniform\n", "seed(321)\n", "\n", "def tom():\n", " sim.sleep(max(0, gauss(100, 50)))\n", " print(\"%g: tom finished\" % sim.now)\n", "\n", "def jerry():\n", " print(\"%g: jerry finished\" % sim.now)\n", "\n", "def compete():\n", " tom_won, jerry_won = 0, 0\n", " for _ in range(10):\n", " print(\"<-- competition starts at %g -->\" % sim.now)\n", "\n", " p = sim.process(tom) # run, tom, run!\n", " e = sim.sched(jerry, offset=uniform(50, 150)) # run, jerry, run!\n", " \n", " # let the race begin...\n", " (r1, r2), timedout = sim.wait((p, e), 100, method=any)\n", " if timedout:\n", " print(\"%g: both disqualified\" % sim.now)\n", " sim.cancel(p)\n", " sim.cancel(e)\n", " elif r1: \n", " print(\"%g: tom wins\" % sim.now)\n", " tom_won += 1\n", " sim.cancel(e)\n", " else:\n", " print(\"%g: jerry wins\" % sim.now)\n", " jerry_won += 1\n", " sim.cancel(p)\n", " print(\"final result: tom:jerry=%d:%d\" % (tom_won, jerry_won))\n", " \n", "sim = simulus.simulator()\n", "sim.process(compete)\n", "sim.run()\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Earlier in the previous example, we showed traps and semaphores as trappables on which we can perform conditional wait. In this example, we show that both events and processes are also trappables. An instance of the process is returned from the `process()` function, and an instance of the event is returned from the `sched()` function. Both are considered as opaque objects. That is, the users are not expected to inspect the content of a process or event. Rather, the users should simple use the references. A process is a trappable, which is triggered when the process is terminated. An event is also a trappable, which is triggered upon the activation of the event (i.e., when the event handler is invoked).\n", "\n", "In this example, we call the simulator's `wait()` function to wait for 'any' of the trappables to be triggered. We also provide a time limit of 100 (we use 'offset' as a positional argument). We look at the return value from the `wait()` function to determine the outcome of the race. If the wait is timed out, we need to kill the process and cancel the event. This is done by calling the simulator's `cancel()` method. If Tom wins (when p is triggered), we just cancel the event; and if Jerry wins (when e is triggered), we instead kill the process.\n" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.3" } }, "nbformat": 4, "nbformat_minor": 2 }