在JDK1.5以后,java里面提供了很多的并发容器,这里我们用的是一个queue,队列。所谓队列其实就是一个容器,就是站成一对,不管票还是人都在里面排成一堆,队列有几种,有先进先出的,还有两端的队列,还有就是栈,先进后出,先加进去的后出来。这里用了一个concurrentlinkedqueue,并发的链表队列。线程里面调用了一个poll方法,意思是往外面拿一个数据,相当于在尾巴里面拿一个,如果没有拿到,他的返回值就是空,那么就中断线程。这里面没有加锁,同样有判断,但是不会出问题。完成卖票功能这种效率是比较高的。queue里面是不能装空值。这里虽然判断和操作是一起的,但是我们没有在判断里面有任何操作,大不了反过头来再拿一边,poll底层实现是cas,这里我们就不用加锁了。
private static Queue<Integer> tickets = new ConcurrentLinkedQueue<>();
static {
for (int i = 0; i < 10000; i++) {
tickets.add(i);
}
}
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
new Thread(()->{
while(true){
Integer poll = tickets.poll();
if(poll == null){
break;
}
System.out.println("销售票编号:" + poll);
}
}).start();
}
}
在往集合中添加数据的时候,先拷贝存储的数组,然后添加元素到拷贝好的数组中,然后用现在的数组去替换成员变量的数组(就是get等读取操作读取的数组)。这个机制和读写锁是一样的,但是比读写锁有改进的地方,那就是读取的时候可以写入的 ,这样省去了读写之间的竞争,看了这个过程,你也发现了问题,同时写入的时候怎么办呢,当然果断还是加锁。读多写少可以用copyonwritelist。
public static void main(String[] args) {
// List<String> lists = new ArrayList<>();
// List<String> lists = new Vector<>();
List<String> lists = new CopyOnWriteArrayList<>();
Random r = new Random();
Thread[] threads = new Thread[100];
for (int i = 0; i < threads.length; i++) {
Runnable task = new Runnable() {
@Override
public void run() {
for (int j = 0; j < 1000; j++) {
lists.add("A" + r.nextInt(10000));
}
}
};
threads[i] = new Thread(task);
}
run(threads);
System.out.println(lists.size());
}
private static void run(Thread[] threads) {
long start = System.currentTimeMillis();
Arrays.asList(threads).forEach(t->t.start());
Arrays.asList(threads).forEach(t->{
try {
t.join();
} catch (Exception e) {
e.printStackTrace();
}
});
long end = System.currentTimeMillis();
System.out.println(end - start);
}
collections是java里面一个集合处理类,里面有给容器加锁的方法,通过调用api可以返回一个加了锁的容器。
并发的hashmap,这个例子测试一下效率。第一种用hashtable,hashtable所有方法都加了锁了,第二种concurrenthashmap,大致能看出来他的效率要比hashtable要高一些,在多线程的情况下。为什么呢,因为hashtable往里面加任何数据的时候都是要锁定整个对象,而
concurrenthashmap,是分成十六个段,每次插数据的时候,只会锁住一小段,1.8之后实现不同。
public static void main(String[] args) {
// Map<String, String> map = new ConcurrentHashMap<>();
Map<String, String> map = new ConcurrentSkipListMap<>();
// Map<String, String> map = new Hashtable<>();
// Map<String, String> map = new HashMap<>();
// Map<String, String> map1 = Collections.synchronizedMap(map);
Random random = new Random();
Thread[] threads = new Thread[100];
CountDownLatch latch = new CountDownLatch(threads.length);
long start_time = System.currentTimeMillis();
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(()->{
for(int j=0; j<10000;j++) {
map.put("a" + random.nextInt(100000), "a" + random.nextInt(100000));
// map1.put("a" + random.nextInt(100000), "a" + random.nextInt(100000));
}
latch.countDown();
});
}
Arrays.asList(threads).forEach(t->t.start());
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
long end_time = System.currentTimeMillis();
System.out.println(end_time-start_time);
}
阻塞式的容器
private static BlockingQueue<String> strings = new LinkedBlockingQueue<>(10);
public static void main(String[] args) {
new Thread(()->{
for (int i = 0; i < 100; i++) {
try {
// 在阻塞式容器里面加了一个方法,put,也就是如果满了就会等待,对应的方法叫take,如果空了就会等待。
// 这种容器我们去用的时候自动就实现了阻塞式的生产者消费者。
strings.put("商品" + i);
} catch (Exception e) {
e.printStackTrace();
}
}
}, "producer").start();
for (int i = 0; i < 5; i++) {
new Thread(()->{
for(;;){
try {
// take,拿,如果空了也会阻塞
System.out.println(Thread.currentThread().getName() + " take " + strings.take()); //如果空了,就会等待
} catch (Exception e) {
e.printStackTrace();
}
}
},"consumer" + i).start();
}
}
有界队列,意思就是说这个队列能装的元素的个数是固定的,后面讲线程池的时候,里面装的其实是一个个任务。这里只能装10个,如果超过了可能会出问题可能会阻塞,这里看你调用什么方法。
private static BlockingQueue<String> strings = new ArrayBlockingQueue<>(10);
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 10; i++) {
strings.put("a" + i);
}
strings.add("aaaa");
// strings.put("aaaa");
// strings.offer("aaaa");
strings.offer("aaaa",1, TimeUnit.SECONDS);
System.out.println(strings);
}
容器里每一个元素都设置了一个时间,时间到了才能从中提取元素
private static BlockingQueue<MyTask> tasks = new DelayQueue<>();
static class MyTask implements Delayed{
long runningTime;
public MyTask(long rt) {
this.runningTime = rt;
}
@Override
public int compareTo(Delayed o) {
if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MICROSECONDS)) {
return -1;
}else if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) {
return 1;
}else {
return 0;
}
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(runningTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public String toString() {
return "" + runningTime;
}
public static void main(String[] args) throws InterruptedException {
long now = System.currentTimeMillis();
MyTask t1 = new MyTask(now+1000);
MyTask t2 = new MyTask(now+2000);
MyTask t3 = new MyTask(now+1500);
MyTask t4 = new MyTask(now+2500);
MyTask t5 = new MyTask(now+500);
tasks.put(t1);
tasks.put(t2);
tasks.put(t3);
tasks.put(t4);
tasks.put(t5);
System.out.println(tasks);
for (int i = 0; i < 5; i++) {
System.out.println(tasks.take());
}
}
}
和普通的queue的方法差不多,多了一个transfer方法。如果你用这种队列的话,往往是消费者先启动,生产者生产一个东西的时候,他先是去找消费者,如果有消费者就直接丢给消费者。
public static void main(String[] args) throws InterruptedException {
LinkedTransferQueue<String> strings = new LinkedTransferQueue<>();
new Thread(()->{
try {
System.out.println("t1"+strings.take());
} catch (Exception e) {
e.printStackTrace();
}
}).start();
new Thread(()->{
try {
System.out.println("t2"+strings.take());
} catch (Exception e) {
e.printStackTrace();
}
}).start();
TimeUnit.SECONDS.sleep(2);
strings.transfer("aaa");
// strings.put("aaa");
System.out.println(strings.size());
// new Thread(()->{
// try {
// System.out.println(strings.take());
// } catch (Exception e) {
// e.printStackTrace();
// }
// }).start();
}
同步队列,同步队列是容量为0,也就是来的东西必须给消费掉,首先启动一个消费者,调用add方法,他报错了只能调用put,意思就是阻塞等待消费者消费。put里面其实用的是transfer,任何东西必须消费,不能往容器里面扔。
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> strings = new SynchronousQueue<>();
new Thread(()->{
try {
System.out.println(strings.take());
} catch (Exception e) {
e.printStackTrace();
}
}).start();
// strings.add("aaa");
strings.put("aaa");
strings.put("aaa");
strings.put("aaa");
strings.put("aaa");
strings.put("aaa");
System.out.println(strings.size());
}