Threads

One way to reduce context switch time is to run lightweight processes within the same address space. This abstraction is often called a thread. Look at Figure 12-1.

Global variables and resources are shared between threads within the same process. Each thread has its own stack.

What system uses threads? Mach, Xinu, OSF/1 (Digital Unix), Java Virtual Machine, almost all new OSes.

Threads vs. Processes

Notes

Context Switching Among Threads

Synchronization primitives:

Scheduling

Can use either preemptive or non-preemptive scheduling. Why one over the other? Same issue as with process scheduling? Closer coordination. Preemptive scheduling does not seem as crucial.

User vs. Kernel threads

Fig 12-8 (from perspective of user threads):
+
can implement on a system not supporting kernel threads
+
fast creation of threads (kernel not involved)
+
fast switching between threads (kernel not involved)
+
customized scheduling algorithm
-
must have jackets around system calls that may block
-
no clock interrupts for time slicing
-
use threads when there are many system calls, not much more work to switch threads in the kernel.
-
do not gain on a multiprocessor.
-
for all threads, worry about non-reentrant code (errno).

Have pthreads package on our system (must use cc and c++ to compile). Run-time library.

Mutex Example

/* in /cs/cs3013/public/example/mutexthr.c */
#include <stdio.h>
#include <pthread.h>

/* cc -o mutexthr mutexthr.c -lpthread */

pthread_mutex_t mutex;  /* mutex id */
main()
{
    pthread_t idA, idB; /* ids of threads */
    void *MyThread(void *);

    if (pthread_mutex_init(&mutex, NULL) < 0) {
        perror("pthread_mutex_init");
        exit(1);
    }
    if (pthread_create(&idA, NULL, MyThread, (void *)"A") != 0) {
        perror("pthread_create");
        exit(1);
    }
    if (pthread_create(&idB, NULL, MyThread, (void *)"B") != 0) {
        perror("pthread_create");
        exit(1);
    }
    (void)pthread_join(idA, NULL);
    (void)pthread_join(idB, NULL);
    (void)pthread_mutex_destroy(&mutex);
}        

int x = 0;  /* global shared variable */

void *MyThread(void *arg)
{
    char *sbName;

    sbName = (char *)arg;
    IncrementX();
    printf("X = %d in Thread %s\n", x, sbName);
}    

IncrementX()
{
    int Temp; /* local variable */

    BeginRegion();  /* enter critical region */
    Temp = x;
    Temp = Temp + 1;
    x = Temp;
    EndRegion();   /* exit critical region */
}    

BeginRegion()
{
    pthread_mutex_lock(&mutex);
}

EndRegion()
{
    pthread_mutex_unlock(&mutex);
}

Synchronization Example

/* in /cs/cs3013/public/example/pcthreads.c */
#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>

/* cc -o pcthreads pcthreads.c -lpthread -lrt */

sem_t produced, consumed; /* semaphores */
int n;
main()
{
    pthread_t idprod, idcons; /* ids of threads */
    void *produce(void *);
    void *consume(void *);
    int loopcnt = 5;

    n = 0;
    if (sem_init(&consumed, 0, 0) < 0) {
        perror("sem_init");
        exit(1);
    }
    if (sem_init(&produced, 0, 1) < 0) {
        perror("sem_init");
        exit(1);
    }
    if (pthread_create(&idprod, NULL, produce, (void *)loopcnt) != 0) {
        perror("pthread_create");
        exit(1);
    }
    if (pthread_create(&idcons, NULL, consume, (void *)loopcnt) != 0) {
        perror("pthread_create");
        exit(1);
    }
    (void)pthread_join(idprod, NULL);
    (void)pthread_join(idcons, NULL);
    (void)sem_destroy(&produced);
    (void)sem_destroy(&consumed);
}        

void *produce(void *arg)
{
    int i, loopcnt;
    loopcnt = (int)arg;
    for (i=0; i<loopcnt; i++) {
        sem_wait(&consumed);
        n++;  /* increment n by 1 */
        sem_post(&produced);
    }
}       

void *consume(void *arg)
{
    int i, loopcnt;
    loopcnt = (int)arg;
    for (i=0; i<loopcnt; i++) {
        sem_wait(&produced);
        printf("n is %d\n", n); /* print value of n */
        sem_post(&consumed);
    }
}

Synchronization Example (C++)

/* in /cs/cs3013/public/example/pcthreads.C */
#include <iostream.h>
#include <pthread.h>
#include <semaphore.h>

/* c++ -o pcthreads pcthreads.C -lpthread -lrt */

sem_t produced, consumed; /* semaphores */
int n;
main()
{
    pthread_t idprod, idcons; /* ids of threads */
    void *produce(void *);
    void *consume(void *);
    int loopcnt = 5;

    n = 0;
    if (sem_init(&consumed, 0, 0) < 0) {
        perror("sem_init");
        exit(1);
    }
    if (sem_init(&produced, 0, 1) < 0) {
        perror("sem_init");
        exit(1);
    }
    if (pthread_create(&idprod, NULL, produce, (void *)loopcnt) != 0) {
        perror("pthread_create");
        exit(1);
    }
    if (pthread_create(&idcons, NULL, consume, (void *)loopcnt) != 0) {
        perror("pthread_create");
        exit(1);
    }
    (void)pthread_join(idprod, NULL);
    (void)pthread_join(idcons, NULL);
    (void)sem_destroy(&produced);
    (void)sem_destroy(&consumed);
}        

void *produce(void *arg)
{
    int i, loopcnt;
    loopcnt = (int)arg;
    for (i=0; i<loopcnt; i++) {
        sem_wait(&consumed);
        n++;  /* increment n by 1 */
        sem_post(&produced);
    }
}       

void *consume(void *arg)
{
    int i, loopcnt;
    loopcnt = (int)arg;
    for (i=0; i<loopcnt; i++) {
        sem_wait(&produced);
        cout << "n is " << n << "\n"; /* print value of n */
        sem_post(&consumed);
    }
}

Java Threads

Java has a pre-defined thread object. Can write new threaded objects by extending this class in Java. Threads can be assigned different priorities.

Mutex Example (Java)

// in /cs/cs3013/public/example/MutexThr.java

// javac MutexThr.java
// java MutexThr

public class MutexThr {
    public static void main(String[] args) {
        SharedX xobj = new SharedX();  // create shared object
        MyThread thrA = new MyThread("A", xobj); // create A thread
        MyThread thrB = new MyThread("B", xobj); // create B thread
        
        thrA.start();                // start A thread
        thrB.start();                // start B thread
        try {
            thrA.join();                // wait for A to finish
            thrB.join();                // wait for B to finish
        } catch (InterruptedException e) {
            System.out.println("Join interrupted");
        }
    }
}

// in /cs/cs3013/public/example/MyThread.java

public class MyThread extends Thread {
    private String myName;        // string identifier
    private SharedX xobj;        // reference to shared object

    public MyThread(String whoami, SharedX xobjarg) {
        myName = whoami;
        xobj = xobjarg;
    }

    public void run() {
        xobj.IncrementX();
        System.out.println("X = " + xobj.ReadX() + " in Thread " + myName);
    }
}

// in /cs/cs3013/public/example/SharedX.java

public class SharedX {
    public int x;

    public SharedX() {
        x = 0;                        // initialize X to zero
    }

    // use synchronized to prohibit concurrent access of x
    public synchronized void IncrementX() {
        int Temp; // local variable

        Temp = x;
        Temp = Temp + 1;
        x = Temp;

    }

    public synchronized int ReadX() {
        return x;                // return current value of x
    }
}

Synchronization Example (Java)

// in /cs/cs3013/public/example/PCExample.java

// javac PCExample.java
// java PCExample

public class PCExample {
    public static void main(String[] args) {
        SharedN nobj = new SharedN();  // create shared object
        PCThread thrP = new PCThread("P", nobj); // create producer thread
        PCThread thrC = new PCThread("C", nobj); // create consumer thread
        
        thrP.start();                // start producer thread
        thrC.start();                // start consumer thread
        try {
            thrP.join();                // wait for producer to finish
            thrC.join();                // wait for consumer to finish
        } catch (InterruptedException e) {
            System.out.println("Join interrupted");
        }
    }
}

// in /cs/cs3013/public/example/PCThread.java

public class PCThread extends Thread {
    private String myName;        // string identifier
    private SharedN nobj;        // reference to shared object

    public PCThread(String whoami, SharedN nobjarg) {
        myName = whoami;
        nobj = nobjarg;
    }

    public void run() {
        int i;
        int loopcnt = 5;

        if (myName.equals("P")) {
            // producer thread
            for (i=0; i<loopcnt; i++) {
                nobj.IncrementN();
            }
        }
        else {
            // consumer thread
            for (i=0; i<loopcnt; i++) {
                System.out.println("n is " + nobj.ReadN());
            }
        }
    }
}
// in /cs/cs3013/public/example/SharedN.java

public class SharedN {
    public int n;
    int cValueAvail = 1;        // initially use one value

    public SharedN() {
        n = 0;                        // initialize N to zero
    }

    // use synchronized and conditions to control access to N
    public synchronized void IncrementN() {
        while (cValueAvail > 0) {
            try {
                wait();   // wait for value to be consumed
            } catch (InterruptedException e) {
                System.out.println("Wait interrupted");
            }
        }
        n++;
        cValueAvail = 1;
        notifyAll();  // could also use notify() if only one is waiting
    }

    public synchronized int ReadN() {
        while (cValueAvail == 0) {
            try {
                wait();   // wait for value to be produced
            } catch (InterruptedException e) {
                System.out.println("Wait interrupted");
            }
        }
        cValueAvail = 0;
        notifyAll();  // could also use notify() if only one is waiting
        return n;  // return value of n (will continue after notify)
    }

}