黑马程序员--Java多线程,黑马--java多线程
黑马程序员--Java多线程,黑马--java多线程
---------------------- android培训、java培训、期待与您交流! ------------------------------------------------------------------------------------------------------------------------------------------------
Java中的线程以及线程并发库
进程和线程的概念:
进程是资源分配的基本单位。进程可以分为多个线程,线程是程序执行的基本单位,也就是基本的控制单元。
一个进程中至少有一个线程。
一个进程中至少有一个线程。
JVM启动的时候,会有一个进程,叫做java.exe.
该进程中至少有一个线程负责java程序的执行。而这个线程运行的代码存在于main()方法中。
该线程称之为主线程。
多线程的意义:
使多个部分的代码同时执行。主要是优化程序,提高效率是其次。
Thread类的工作原理:
以下是部分源码
如果是直接创建Thread对象的话,new Thread();因为target=null,所 以执行run 方 法 的 时 候,
直接就完了,没有关连想要运行的代码。这个时候,如果想关联代码的话,就要创 建 Thread 的
子类,重写run方法。从Thread类的源代码可以看出,其实这两种方式都是在 调用Thread 对 象 的
run方法,如果Thread类的run方法没有被覆盖,并且为该Thread对象设置了一个 Runnable对 象,
该run方法会调用Runnable对象的run方法。如果在Thread子类覆盖的run方法中编写了运行代 码,
也为Thread子类对象传递了一个Runnable对象,那么,线程运行时的执行代码是子类的run 法
的代码?还是Runnable对象的run方法的代码?涉及到的一个以往知识点:匿名内部类对象的 构
造方法如何调用父类的非默认构造方法。这个时候运行的是子类中的run方法,而不是 Runnable
中的run方法。
线程的几种状态以及状态之间的转换:
这里的原理图只是主要的情况,不全面。
当线程创建后,如果进入内存,但是没有分配到CPU,就进入了就绪状态,如果分配到了CPU,就进入了运行状态。
如果就绪状态获取cpu就,就变为了运行状态,就绪态是什么都具备了,就是没有获取CPU的执行权。
如果运行状态遇到sleep(),Wait()方法后,将进入了冻结状态(阻塞状态,不具备IO或者什么的)。如果时间到或者Notify()的话,有可能恰好获取CPU而进入了运行状态。也有可能是什么都具备了,但是没有获取CPU资格,而进入了就绪状态。
如果就绪状态或是运行状态遇到了什么问题或是等待IO等就会进入冻结状态。
创建线程的第二种方式:
实现runnable接口。
线程创建的两种方式:
方式1:
1. 继承Thread类。
2. 覆写run()方法。
3. 调用线程的start()方法。
start()方法的含义:
启动该线程。Java虚拟机调用该线程的run()方法。
方式2:
1. 定义类实现runnable接口。
2. 覆盖runnable接口中的run方法。(将线程要运行的代码存放到该run方法中)
3. 通过Thread类建立线程对象。
4. 将Runnable接口的子类对象作为实际参数传递给Thread类的构造函数。
5. 调用Thread类的start方法开启线程并调用runnable接口子类的run方法。
线程创建可以通过继承Thread类或者实现Runnable接口,但是两者有何区别?各有和优势呢?
因为java中类是单继承,所以如果是一个项目中,假设Student类继承了Person类的话,但是
Student类中的一部分代码也想通过多线程来执行,但是因为单继承的原因,就无法在继承Thread
类来通过覆写run()方法来完成。所以java工程师们就通过让我们是现接口的方式来
还可以将代码用多线程执行。
所以在定义线程时,多用实现runnable接口的方式来创建。
其实Thread类也继承了runnable接口,runnable接口的定义就是用来确立多线程代码的存放位置。
线程同步问题产生的原因:
当多条语句在操作同一个多线程共享数据。一个线程对多条语句只执行了一部分。
没有执行完。另一个线程就参与进来执行。导致共享数据的错误。
线程同步问题的解决方案:
对多条操作共享数据的语句,在某个时间段,只能让一个线程都执行完。在执行过程中,其他线程不可以参与执行。
同步的前提:
1. 必须要有两个或两个以上的线程。
2. 必须是多个线程使用同一个锁。
同步代码块的介绍:
synchronized(对象)
{
需要被同步的代码;
}
只要一个线程进入到同步代码块中,则其它的线程只能在该代码块外等待,不能执行该代码块。
什么代码应该放在同步代码块中:
1. 明确哪些代码是多线程运行代码。
2. 明确共享数据。
3. 明确多线程运行代码中哪些语句是操作共享数据的。
线程同步的例子:
以下是一个卖票的例子,如果线程不同步的话,会出现卖出0号票,-1,-2号票这种不正常的情况。
以上代码用到了同步,所以不会出现异常情况。
线程间的通信:
线程不仅仅要能够同步,即线程之间执行的时候互不影响。同时如果要能够相互按照某种规则有序执行,这就是线程通信。
以下是生产者和消费者的经典案例,演示了线程的同步和通信:
class Resource {
private boolean flag = false;
private int count = 0;
//生产的同步方法。
public synchronized void Produce() {
while (flag) {
try {
this.wait(); //如果flag=true,那么生产者停止。
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName()
+ ".......Produce goods.." + (++count));
flag = true;
this.notifyAll();
}
//消费的同步方法。
public synchronized void Custom() {
while (!flag) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName()
+ "..Custom goods.." + count);
flag = false;
this.notifyAll();
}
}
//创建生产者,继承runnable接口。
class Producer implements Runnable {
private Resource res;
public Producer(Resource res) {
this.res = res;
}
@Override
public void run() {
while (true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
res.Produce();
}
}
}
class Customer implements Runnable {
private Resource res;
public Customer(Resource res) {
this.res = res;
}
@Override
public void run() {
while (true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
res.Custom();
}
}
}
public class ProducerandCustomer {
public static void main(String[] args) {
Resource res = new Resource();
Customer cus = new Customer(res);
Producer pro = new Producer(res);
Thread p1 = new Thread(pro);
Thread p2 = new Thread(pro);
Thread c1 = new Thread(cus);
Thread c2 = new Thread(cus);
Thread c3 = new Thread(cus);
c1.start();
c2.start();
c3.start();
p1.start();
p2.start();
}
}
死锁:
死锁的原因是两个都进入同步方法中的线程,在争夺对方占有的资源,从而造成死锁。
具体编程时候的表现是:
同步中嵌套同步,而锁去不同。
以下是一个死锁的例子:
/*
* 虽然不是同一个DeadLockTest对象,但是锁是static的,所以不同的DeadLock用到的LockA,和LockB是相同的。
*/
public class DeadLock {
public static void main (String[] args) {
new Thread(new DeadLockTest(true)).start();
new Thread(new DeadLockTest(false)).start();
}
}
class DeadLockTest implements Runnable {
private boolean flag;
public DeadLockTest (Boolean flag) {
this.flag = flag;
}
@Override
//同步中嵌套同步,而锁却不同,这就是导致线程死锁的原因。
public void run() {
if (flag) {
synchronized (MyLock.lockA) {
System.out.println("IF ....LOCKA..");
synchronized (MyLock.lockB) {
System.out.println("IF-----------LOCKB");
}
}
} else {
synchronized (MyLock.lockB) {
System.out.println("ELSE---------LOCKB");
synchronized (MyLock.lockA) {
System.out.println("ELSE...LOCKA");
}
}
}
}
}
class MyLock {
static Object lockA = new Object();
static Object lockB = new Object();
}
停止线程:
停止线程的方式:
1.run()方法结束。
2.开启多线程运行,运行代码通常是循环结构。只要控制住循环,就可以让run方法结束,也就是让线程结束。
以下是线程结束的一个例子:
class StopThread implements Runnable
{
private boolean flag =true;
public void run()
{
while(flag)
{
System.out.println(Thread.currentThread().getName()+"....run");
}
}
public void changeFlag()
{
flag = false;
}
}
class StopThreadDemo
{
public static void main(String[] args)
{
StopThread st = new StopThread();
Thread t1 = new Thread(st);
Thread t2 = new Thread(st);
t1.setDaemon(true);
t2.setDaemon(true);
t1.start();
t2.start();
int num = 0;
while ( true )
{
if(num++ == 60)
{
//st.changeFlag();
//t1.interrupt();
//t2.interrupt();
break;
}
System.out.println(Thread.currentThread().getName()+"......."+num);
}
System.out.println("over");
}
}
}
特殊情况:
当线程处于冻结状态。就不会读取到标记,那么线程就不会结束。
该情况可以用Thread类的interrupt()方法来解决。
下边是对interrupt方法的解释:
如果线程在调用 Object 类的 wait(),join(),sleep(long)方法以及这些方法的重载方法,使线程受
阻,则使用interrupt()方法使线程的中断状态被清除,该线程还将收到一个 InterruptedException。
意思就是说将处于冻结状态的线程强制恢复到运行状态。
注意该方法并不是使线程终止。
守护线程:
守护线程其实就是后台线程:后台线程的运行和前台线程运行的情况是一样的。都在争夺CPU的执行权。只有停止的时候不一样。如果前台线程全部都停止了,后台线程都会自动终止。
无论此时该后台线程处于运行状态还是冻结状态,都要终止。
通过Thread类中的setDaemon().方法将线程创建为守护线程。
下边是对setDaemon()方法的描述:
public final void setDaemon(boolean on)
将该线程标记为守护线程或用户线程。当正在运行的线程都是守护线程时,Java 虚拟机退出。该方法必须在启动线程前调用。就是在start()方法前调用。
Join方法:
如果线程t调用Join()方法。则原来持有CPU执行权的线程释放掉执行权。
等待 t执行完以后,自己才继续执行。
class Demo implements Runnable
{
public void run()
{
for(int x=0; x<70; x++)
{
System.out.println(Thread.currentThread().toString()+"....."+x);
Thread.yield();
}
}
}
class JoinDemo
{
public static void main(String[] args) throws Exception
{
Demo d = new Demo();
Thread t1 = new Thread(d);
Thread t2 = new Thread(d);
t1.start();
t2.start();
t1.join();
for(int x=0; x<80; x++)
{
System.out.println("main....."+x);
}
System.out.println("over");
}
}
多线程之间数据共享的方式总结:
1. 如果每个线程执行的代码相同,可以使用同一个Runnable对象,这个Runnable对象中有那个共享的数据。
例如:卖票系统就可以这么做。
这个时候,仅仅是涉及到同步互斥,而没有涉及到通信问题。
2. 如果每个线程执行的代码不同,这个时候需要用不同的Runnable对象。有如下两种方式来实现Runnable对象之间的数据共享:注意操作不一致,并不代表就一定涉及到通信问题。可能就仅仅是不让各线程在获取CPU执行的时候,不被打扰。
一.
将共享数据封装在另外一个对象中,然后将这个对象逐一传递给各个Runnable对象。每个线程对共享数据的操作也分配到 那个对象身上去完成。这样容易实现针对该数据进行的各个操作的互斥和通信。
二.
将这些Runnable对象作为某一个类中的内部类,共享数据作为这个外部类中的成员变量。每个线程对共享数据的操作方法也分配给外部类,以便实现对共享数据进行的各个操作的互斥和通信。作为内部类的各个Runnable对象调用外部类的这些方法。
上述两种方式的组合:
将共享数据封装在另一个对象中,每个线程对共享数据的操作方法也分配到那个对象身上去完成。对象作为这个外部类中的成员变量或者是方法中的局部变量。每个线程的Runnable对象作为外部类中的成员内部类或局部内部类。
总之,要同步互斥的几段代码最好是分别放在几个独立的方法中,这些方法再放在同一个类中,这样容易实现同步互斥和通信。
经验:要用到共同数据(包括同步锁)的若干方法应该归在同一个类身上,这种设计正好体现了高内聚和程序的健壮性。
同步互斥以及通信的代码应该放在代表资源的方法内部,而不是放在线程代码中。
这是因为放在资源的内部的时候,这样一个新的线程调用方法的时候,就不用在考虑同步互斥以及通信,因为调用的这些方法已经是同步和互斥的。如果写在线程代码中,每启用一个新的线程,就要写一次重复的代码,还要考虑同步互斥问题。
有几个不同的操作,就要有几个不同的Runnable对象,因为Runnable对象是用来装载运行的代码的。这要和线程的概念区分开。两个线程共用一个Runnable,就是在运行相同的代码。
而不同的Thread,就需要创建不同的run对象或者不同的run方法。
局部变量是不会出现数据共享的,一般都是成员变量才可以被数据共享。
方式1举例:
public class MultiThreadShareData2 {
public static void main(String[] args) {
ShareData2 sd = new ShareData2();
new Thread(new MyRunnable1(sd)).start();
new Thread(new MyRunnable2(sd)).start();
}
}
class MyRunnable1 implements Runnable {
private ShareData2 sd;
public MyRunnable1(ShareData2 sd) {
this.sd = sd;
}
@Override
public void run() {
while (true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
sd.increment();
}
}
}
class MyRunnable2 implements Runnable {
private ShareData2 sd;
public MyRunnable2(ShareData2 sd) {
this.sd = sd;
}
@Override
public void run() {
while (true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
sd.decrement();
}
}
}
class ShareData2 {
private int j = 0;
private boolean Shouldincrease = true;
public synchronized void increment() {
while (!Shouldincrease) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("Increase j :" + (++j));
Shouldincrease = false;
this.notifyAll();
}
public synchronized void decrement() {
while (Shouldincrease) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("decrease j :" + (--j));
Shouldincrease = true;
this.notifyAll();
}
}
方式2:
public class MultiThreadShareData3 {
private int j = 0;
private boolean Shouldincrease = true;
public synchronized void increment() {
while (!Shouldincrease) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("Increase j :" + (++j));
Shouldincrease = false;
this.notifyAll();
}
public synchronized void decrement() {
while (Shouldincrease) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("decrease j :" + (--j));
Shouldincrease = true;
this.notifyAll();
}
public static void main(String[] args) {
final MultiThreadShareData3 mts = new MultiThreadShareData3();
new Thread(new Runnable() {
@Override
public void run() {
while (true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
mts.increment();
}
}
}).start();
new Thread (new Runnable() {
@Override
public void run() {
while (true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
mts.decrement();
}
}
}).start();
}
}
方式3:
//这是在涉及到通信的问题的时候,也就是具有不同的操作,要互相协调进行。
//将资源对象作为外部类的成员变量或者是局部变量,Runnable对象作为该
//外部类的内部类调用这个资源中的方法。
//这个是结合后的方式。
public class MultiThreadShareData1 {
public static void main(String[] args) {
final ShareData1 sd = new ShareData1();
new Thread(new Runnable() {
@Override
public void run() {
while (true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
sd.increment();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
while (true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
sd.decrement();
}
}
}).start();
}
}
class ShareData1 {
private int j = 0;
private boolean Shouldincrease = true;
public synchronized void increment() {
while (!Shouldincrease) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("Increase j :" + (++j));
Shouldincrease = false;
this.notifyAll();
}
public synchronized void decrement() {
while (Shouldincrease) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("decrease j :" + (--j));
Shouldincrease = true;
this.notifyAll();
}
}
ThreadLocal实现线程范围内数据共享:
关于线程范围内的变量共享的举例:
监狱里罪犯的排队打饭,针对A罪犯,那几个打饭和打菜和打汤的模块操作的饭盆是A罪犯相关的饭盆;针对B罪犯,那几个打饭和打菜和打汤的模块操作的饭盆是B罪犯相关的饭盆。
对于相同的程序代码,多个模块在同一个线程中运行时要共享一份数据,而在另外线程中运行时又共享另外一份数据。
每个线程调用全局ThreadLocal对象的set方法,就相当于往其内部的map中增加一条记录,key分别是各自的线程,value是各自的set方法传进去的值。在线程结束时可以调用ThreadLocal.clear()方法,这样会更快释放内存,不调用也可以,因为线程结束后也可以自动释放相关的ThreadLocal变量。
ThreadLocal的应用场景:
1. 订单处理包含一系列操作:减少库存量、增加一条流水账、修改总账。这几个操作要在同一个事务中(即相同的数据库连接中)完成,通常也即同一个线程中进行处理。如果其中的任何一个操作失败了,则应该把前面的操作回滚。否则,提交所有操作。这要求这些操作使用相同的数据库连接对象,而这些操作的代码分别位于不同的模块类中。
2. 银行转账包含一系列操作: 把转出帐户的余额减少,把转入帐户的余额增加,这两个操作要在同一个事务中完成,它们必须使用相同的数据库连接对象,转入和转出操作的代码分别是两个不同的帐户对象的方法。
3. Structs2的ActionContext,同一段代码被不同的线程调用运行时,该代码操作的数据是每个线程各自的状态和数据,对于不同的线程来说,getContext方法拿到的对象都不相同,对同一个线程来说,不管调用getContext方法多少次和在哪个模块中getContext方法,拿到的都是同一个。
总结:一个ThreadLocal代表一个变量,故其中只能放一个数据,如果有成千上万的数据要线程共享呢?
可以先定义一个对象来装这些数据,然后在ThreadLocal中存储这个对象。
使用ThreadLocal实现线程间数据共享的例子:
public class ThreadLocalTest {
static ThreadLocal<Integer> tl = new ThreadLocal<Integer>();
public static void main(String[] args) {
for (int i = 0; i < 2; i++) {
new Thread(new Runnable() {
@Override
// 此处模拟的是模块A,B在某个线程上运行。
public void run() {
int data = new Random().nextInt();
System.out.println(Thread.currentThread().getName()
+ " has put data:" + data);
tl.set(data);
new A().get();
new B().get();
}
}).start();
}
}
static class A {
public void get() {
int data = tl.get();
System.out.println("A from " + Thread.currentThread().getName()
+ " get data:" + data);
}
}
static class B {
public void get() {
int data = tl.get();
System.out.println("B from " + Thread.currentThread().getName()
+ " get data:" + data);
}
}
}
线程并发库中的工具包:
软件包 java.util.concurrent.atomic的描述
类的小工具包,支持在单个变量上解除锁的线程安全编程。
AtomicBoolean |
可以用原子方式更新的 |
AtomicInteger |
可以用原子方式更新的 |
AtomicIntegerArray |
可以用原子方式更新其元素的 |
AtomicIntegerFieldUpdater<T> |
基于反射的实用工具,可以对指定类的指定 |
AtomicLong |
可以用原子方式更新的 |
AtomicLongArray |
可以用原子方式更新其元素的 |
AtomicLongFieldUpdater<T> |
基于反射的实用工具,可以对指定类的指定 |
线程池:
线程池的概念:
首先创建一些线程,它们的集合称为线程池,当服务器接受到一个客户请求后,就从线程池中取出一个空闲的线程为之服务,服务完后不关闭该线程,而是将该线程还回到线程池中。
在线程池的编程模式下,任务是提交给整个线程池,而不是直接交给某个线程,线程池在拿到任务后,它就在内部找有无空闲的线程,再把任务交给内部某个空闲的线程,这就是封装。记住,任务是提交给整个线程池,一个线程同时只能执行一个任务,但可以同时向一个线程池提交多个任务。
此时可以改进以前我们写的多个客户端来访问服务器:
以前做的总是每来一个客户,服务器端就接收,创建一个线程来服务,服务完了,这个线程也就死掉了。
若访问服务器的客户端很多,那么服务器要不断地创建和销毁线程,这将严重影响服务器的性能
如果用了线程池的话。就可以当服务器接受到一个客户请求后,就从线程池中取出一个空闲的线程为之服务,服务完后不关闭该线程,而是将该线程还回到线程池中。这样服务器的性能就提高了。
以下是常用的三种创建线程池的例子:
创建固定个数的线程的线程池
public class ThreadPoolTest {
public static void main(String[] args) {
// 线程池中3个线程来执行10个任务,每个任务的内容都是循环10次。
// 当线程池中无空闲线程的时候,其他任务只能是等待。只有当前的
// 任务执行完后,才能有空余线程去执行剩余的任务。
ExecutorService threadpool = Executors.newFixedThreadPool(3);
// 向线程池中加入了10个任务。
for (int i = 0; i < 10; i++) {
final int task = i;
// 向线程池中加入一个任务。
threadpool.execute(new Runnable() {
@Override
public void run() {
for (int j = 0; j < 10; j++) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()
+ "loop of :" + j + " for task of:" + task);
}
}
});
}
// threadpool.shutdown();当提交的任务都执行完后,将所有的线程摧毁。
// threadpool.shutdownNow();不管任务是否执行完成,都将所有的线程摧毁。
}
}
创建动态增减线程数量的线程池:
public class ThreadPoolTest1 {
// 该池子根据需要服务的任务数量,动态的增减线程数量。
// 例如(以下仅是描述了一个情况而以):
// 本来有3个任务,就创建3个线程来执行。如果突然又增减了4个任务,这个时候,如果前
// 3个任务还没有执行完成,线程池就会再创建4个线程来为新的任务服务。过了一段时间
// 只有一个任务需要执行了,因为池子中现在有7个线程,所以让一个线程执行,其他的5个
// 超过等待时间就会被摧毁。
public static void main(String[] args) {
// 缓存线程池,该池子中的线程的个数是不确定的。
ExecutorService threadpool = Executors.newCachedThreadPool();
// 向线程池中加入了10个任务。
for (int i = 0; i < 10; i++) {
final int task = i;
// 向线程池中加入一个任务。
threadpool.execute(new Runnable() {
@Override
public void run() {
for (int j = 0; j < 10; j++) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()
+ " loop of : " + j + " for task of: "
+ task);
}
}
});
}
}
}
创建恒有一个线程的线程池:
public class ThreadPoolTest2 {
// 总是保证线程池中有一个线程。但是这个线程从始至终不一定是同一个线程。
// 可能是一个线程死掉了,又启动了一个新的线程。不管怎样,总是保证池子中有一个线程的存在。
public static void main(String[] args) {
// 该池子中仅有一个线程。
ExecutorService threadpool = Executors.newSingleThreadExecutor();
// 向线程池中加入了10个任务。
for (int i = 0; i < 10; i++) {
final int task = i;
// 向线程池中加入一个任务。
threadpool.execute(new Runnable() {
@Override
public void run() {
for (int j = 0; j < 10; j++) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()
+ " loop of : " + j + " for task of: "
+ task);
}
}
});
}
}
}
Callable和Future
通过线程池的submit方法提交Callable对象,目的是用线程池去执行Callable对象中call方法中的代码块,整个submit方法返回的结果类型是一个Future对象。可以通过Futrue对象的get()方法获取Callable对象中的call方法返回的数据。这是通过泛型来实现的。
所以Futrue<v>的泛型v一定要和Callable<T>的泛型T一致才可以。
Callable要采用ExecutorSevice的submit方法提交,返回的future对象可以取消任务。
CompletionService
用于提交一组Callable任务,其take方法返回已完成的一个Callable任务对应的Future对象。
好比我同时种了几块地的麦子,然后就等待收割。收割时,则是那块先成熟了,则先去收割哪块麦子。
Callable和Future应用的例子:
public class CallableandFutrue {
public static void main(String[] args) throws Exception {
//SingleCallableandFutrue();
ManyCallableandFuture();
}
@SuppressWarnings("unused")
private static void SingleCallableandFutrue() {
ExecutorService threadpool = Executors.newSingleThreadExecutor();
Future<String> future = threadpool.submit(new Callable<String>() {
@Override
public String call() throws Exception {
Thread.sleep(10000);
return "Hello!";
}
});
System.out.println("Wait the result....");
try {
System.out.println("Get the result:" + future.get());
} catch (Exception e) {
e.printStackTrace();
}
}
private static void ManyCallableandFuture() {
ExecutorService threadpool = Executors.newFixedThreadPool(3);
CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(
threadpool);
for (int i = 0; i < 10; i++) {
final int seq = i;
completionService.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
Thread.sleep(new Random().nextInt(5000));
return seq;
}
});
}
for (int i = 0; i < 10; i++) {
try {
System.out.println(completionService.take().get());
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
缓冲队列:
假设用一个10个元素的数组作为缓冲区的话,那么如果输入的话,则是要循环的输入,当缓冲区满了以后,要输入要阻塞,不能继续在输入,只有有元素被输出,有空余位置了,再输入元素。
输出的时候,如果是缓冲区中没有了元素,那么输出就要阻塞,等到放入了元素的话,在进行输出。
Semaphore可以维护当前访问自身的线程个数,并提供了同步机制。使用Semaphore可以控制同时访问资源的线程个数,例如,实现一个文件允许的并发访问数。
Semaphore实现的功能就类似厕所有5个坑,假如有十个人要上厕所,那么同时能有多少个人去上厕所呢?同时只能有5个人能够占用,当5个人中的任何一个人让开后,其中在等待的另外5个人中又有一个可以占用了。
另外等待的5个人中可以是随机获得优先机会,也可以是按照先来后到的顺序获得机会,这取决于构造Semaphore对象时传入的参数的选项。
单个信号量的Semaphore对象可以实现互斥锁的功能,并且可以是由一个线程获得了“锁”,再由另一个线程释放“锁”,这可应用于死锁恢复的一些场合。
CyclicBarrie的应用:
CyclicBarrie
\ | /
\ | /
------------------------假如是三个线程干完各自的任务,在不同的时刻到达集合点后,就可以接着忙各自的工作去了,再到达新的集合点,再去忙各自的工作,到达集合点了用CyclicBarrier对象的await方法表示。
/ | \
/ | \
好比是9点开始大家在公园里,说12点开饭。那么中间的间隙是大家的自由活动的时间,等到快12点的时候,大家都陆陆续续的来了,第一个人来的时候,等着后边的人,不能吃饭,只有最后一个人来了,才能开饭。
例子:
public class CyclicBarrierTest {
public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool();
final CyclicBarrier cb = new CyclicBarrier(3);
for (int i = 0; i < 3; i++) {
Runnable runnable = new Runnable() {
public void run() {
try {
Thread.sleep((long) (Math.random() * 10000));
System.out.println("线程"
+ Thread.currentThread().getName()
+ "即将到达集合地点1,当前已有"
+ (cb.getNumberWaiting() + 1)
+ "个已经到达,"
+ (cb.getNumberWaiting() == 2 ? "都到齐了,继续走啊"
: "正在等候"));
cb.await();
Thread.sleep((long) (Math.random() * 10000));
System.out.println("线程"
+ Thread.currentThread().getName()
+ "即将到达集合地点2,当前已有"
+ (cb.getNumberWaiting() + 1)
+ "个已经到达,"
+ (cb.getNumberWaiting() == 2 ? "都到齐了,继续走啊"
: "正在等候"));
cb.await();
Thread.sleep((long) (Math.random() * 10000));
System.out.println("线程"
+ Thread.currentThread().getName()
+ "即将到达集合地点3,当前已有"
+ (cb.getNumberWaiting() + 1)
+ "个已经到达,"
+ (cb.getNumberWaiting() == 2 ? "都到齐了,继续走啊"
: "正在等候"));
cb.await();
} catch (Exception e) {
e.printStackTrace();
}
}
};
service.execute(runnable);
}
service.shutdown();
}
}
CountDownLatch:
犹如倒计时计数器,调用CountDownLatch对象的countDown方法就将计数器减1,当计数到达0时,则所有等待者或单个等待者开始执行。
可以实现一个人(也可以是多个人)等待其他所有人都来通知他,这犹如一个计划需要多个领导都签字后才能继续向下实施。还可以实现一个人通知多个人的效果,类似裁判一声口令,运动员同时开始奔跑。
public class CountdownLatchTest {
public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool();
final CountDownLatch cdOrder = new CountDownLatch(1);
final CountDownLatch cdAnswer = new CountDownLatch(3);
for (int i = 0; i < 3; i++) {
Runnable runnable = new Runnable() {
public void run() {
try {
System.out.println("线程"
+ Thread.currentThread().getName() + "正准备接受命令");
cdOrder.await();
System.out.println("线程"
+ Thread.currentThread().getName() + "已接受命令");
Thread.sleep((long) (Math.random() * 10000));
System.out
. println("线程"
+ Thread.currentThread().getName()
+ "回应命令处理结果");
cdAnswer.countDown();
} catch (Exception e) {
e.printStackTrace();
}
}
};
service.execute(runnable);
}
try {
Thread.sleep((long) (Math.random() * 10000));
System.out.println("线程" + Thread.currentThread().getName()
+ "即将发布命令");
cdOrder.countDown();
System.out.println("线程" + Thread.currentThread().getName()
+ "已发送命令,正在等待结果");
cdAnswer.await();
System.out.println("线程" + Thread.currentThread().getName()
+ "已收到所有响应结果");
} catch (Exception e) {
e.printStackTrace();
}
service.shutdown();
}
}
Exchanger:
用于实现两个线程之间的数据交换,每一个线程在完成一定的事务后执,当第一个到达Exchanger对象的exchange方法的时候,拿出数据一直等待第二个线程,直到第二个线程也执行到exchange方法。这就相当于是两个线程碰面了,就可以交换数据了。
public class ExchangerTest {
public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool();
final Exchanger<String> exchanger = new Exchanger<String>();
service.execute(new Runnable() {
public void run() {
try {
Thread.sleep((long) (Math.random() * 10000));
String data1 = "zxx";
System.out.println("线程" + Thread.currentThread().getName()
+ "正在把数据" + data1 + "换出去");
String data2 = (String) exchanger.exchange(data1);
System.out.println("线程" + Thread.currentThread().getName()
+ "换回的数据为" + data2);
} catch (Exception e) {
}
}
});
service.execute(new Runnable() {
public void run() {
try {
Thread.sleep((long) (Math.random() * 10000));
String data1 = "lhm";
System.out.println("线程" + Thread.currentThread().getName()
+ "正在把数据" + data1 + "换出去");
String data2 = (String) exchanger.exchange(data1);
System.out.println("线程" + Thread.currentThread().getName()
+ "换回的数据为" + data2);
} catch (Exception e) {
}
}
});
}
}
ArrayBlockingQueue:
抛出异常 |
特殊值 |
阻塞 |
超时 |
|
插入 |
add(e) |
offer(e) |
put(e) |
offer(e, time, unit) |
移除 |
remove() |
poll() |
take() |
poll(time, unit) |
检查 |
element() |
peek() |
不可用 |
不可用 |
阻塞队列与Semaphore有些相似,但也不同,阻塞队列是一方存放数据,另一方释放数据,Semaphore通常则是由同一方设置和释放信号量。
ArrayBlockingQueue
只有put方法和take方法才具有阻塞功能
用两个具有1个空间的阻塞队列来实现同步通知的功能。
public class BlockingQueueTest {
public static void main(String[] args) {
final BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(3);
for (int i = 0; i < 2; i++) {
new Thread() {
public void run() {
while (true) {
try {
Thread.sleep((long) (Math.random() * 1000));
System.out.println(Thread.currentThread().getName()
+ "准备放数据!");
queue.put(1);
System.out.println(Thread.currentThread().getName()
+ "已经放了数据," + "队列目前有" + queue.size()
+ "个数据");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}.start();
}
new Thread() {
public void run() {
while (true) {
try {
// 将此处的睡眠时间分别改为100和1000,观察运行结果
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName()
+ "准备取数据!");
queue.take();
System.out.println(Thread.currentThread().getName()
+ "已经取走数据," + "队列目前有" + queue.size() + "个数据");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}.start();
}
}
并发的同步集合:
传统的集合是不同步的。如果是多线程操作的话,就会出现问题。
这是为什么呢?
while(it.hasnext()){
it.next();
}
在hasnext所在的类中用一个count来记录集合总元素的个数。
假设count=4;
hasnext(){
if(curor==count){
return false;
}
return true;
}
next(){
list.get( curor++ );
}
remove(){
count- -;
}
以上代码是迭代器中对集合中元素操作的方法的大致的思路。
假设一个线程在迭代最后一个元素的时候,运行了it.next().也就是得到了集合中的第四个元素,索引值为3,然后是curor++,这样curor=4了,但是此时另一个线程调用了remove方法,这个时候的话count就会=3,当再次执行while循环判断的时候curor=4,而count=3,两者不相等,就会进入了死循环的状态。
造成不安全的原因是:一个线程在读的过程称,另一个线程对集合进行了其他的操作。
在jdk5.0以前的解决方案是:
|
synchronizedCollection |
|
|
synchronizedList |
|
|
synchronizedMap |
|
|
synchronizedSet |
|
|
synchronizedSortedMap |
|
|
synchronizedSortedSet |
通过Collections提供的工具类中的方法,传递一个传统的集合,返回一个同步的集合。这是通过代理来实现的,虽然这是以前的解决方案,但是思想是可以借鉴的。
并发 Collection
ConcurrentHashMap
ConcurrentSkipListMap
ConcurrentSkipListSet
CopyOnWriteArrayList
CopyOnWriteArraySet
当期望许多线程访问一个给定 collection时,ConcurrentHashMap通常优于同步的HashMap。
ConcurrentSkipListMap通常优于同步的TreeMap。
当期望的读数和遍历远远大于列表的更新数时,CopyOnWriteArrayList优于同步的ArrayList。
此包中与某些类一起使用的“Concurrent”前缀;是一种简写,表明与类似的“同步”类有所不同”。
例如Collections.synchronizedMap(new HashMap())是同步的,但ConcurrentHashMap
则是“并发的”。
并发 collection 是线程安全的(在底层已经实现了),但是不受单个排他锁的管理(例如同步代码块的锁)。在 ConcurrentHashMap这一特定情况下,它可以安全地允许进行任意数目的并发读取,以及数目可调的并发写入。
也就是说用了并发 Collection,就不用再考虑同步的问题了,因为底层设计的就是线程安全的,好处是有并发性。允许多个外部引用同时对Collection对象进行操作。
Timer组件:
|
schedule |
|
schedule |
|
schedule |
|
schedule |
|
scheduleAtFixedRate |
|
scheduleAtFixedRate |
public class TimerFirst {
public static void main(String[] args) {
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
System.out.println("Big booming!");
}
}, 3000);
new Timer().schedule(new TimerTask() {
@Override
public void run() {
System.out.println("BoomBing!!!");
}
}, 1000, 5000);
while (true) {
System.out.println(new Date().getSeconds());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
-----------------------android培训、java培训、期待与您交流! --------------------------------------------------------------------------------------------------------------------------------------------------
详细请查看:http://edu.csdn.net/heima
相关文章
- 暂无相关文章
用户点评