java中的并发容器

Published on with 0 views and 0 comments

1、ConcurrentLinkedQueue

    在JDK1.5以后,java里面提供了很多的并发容器,这里我们用的是一个queue,队列。所谓队列其实就是一个容器,就是站成一对,不管票还是人都在里面排成一堆,队列有几种,有先进先出的,还有两端的队列,还有就是栈,先进后出,先加进去的后出来。这里用了一个concurrentlinkedqueue,并发的链表队列。线程里面调用了一个poll方法,意思是往外面拿一个数据,相当于在尾巴里面拿一个,如果没有拿到,他的返回值就是空,那么就中断线程。这里面没有加锁,同样有判断,但是不会出问题。完成卖票功能这种效率是比较高的。queue里面是不能装空值。这里虽然判断和操作是一起的,但是我们没有在判断里面有任何操作,大不了反过头来再拿一边,poll底层实现是cas,这里我们就不用加锁了。

private static Queue<Integer> tickets = new ConcurrentLinkedQueue<>();
	
	static {
		for (int i = 0; i < 10000; i++) {
			tickets.add(i);
		}
	}
	
	public static void main(String[] args) {
		for (int i = 0; i < 10; i++) {
			new Thread(()->{
				while(true){
					Integer poll = tickets.poll();
					if(poll == null){
						break;
					}
					System.out.println("销售票编号:" + poll);
				}
			}).start();
		}
	}

2、CopyOnWriteList

    在往集合中添加数据的时候,先拷贝存储的数组,然后添加元素到拷贝好的数组中,然后用现在的数组去替换成员变量的数组(就是get等读取操作读取的数组)。这个机制和读写锁是一样的,但是比读写锁有改进的地方,那就是读取的时候可以写入的 ,这样省去了读写之间的竞争,看了这个过程,你也发现了问题,同时写入的时候怎么办呢,当然果断还是加锁。读多写少可以用copyonwritelist。

	public static void main(String[] args) {
//		List<String> lists = new ArrayList<>();
//		List<String> lists = new Vector<>();
		List<String> lists = new CopyOnWriteArrayList<>();
		Random r = new Random();
		Thread[] threads = new Thread[100];
		
		for (int i = 0; i < threads.length; i++) {
			Runnable task = new Runnable() {
				@Override
				public void run() {
					for (int j = 0; j < 1000; j++) {
						lists.add("A" + r.nextInt(10000));
					}
				}
			};
			threads[i] = new Thread(task);
		}
		
		run(threads);
		
		System.out.println(lists.size());
	}

	private static void run(Thread[] threads) {
		long start = System.currentTimeMillis();
		Arrays.asList(threads).forEach(t->t.start());
		Arrays.asList(threads).forEach(t->{
			try {
				t.join();
			} catch (Exception e) {
				e.printStackTrace();
			}
		});
		long end = System.currentTimeMillis();
		System.out.println(end - start);
	}

3、Collections

    collections是java里面一个集合处理类,里面有给容器加锁的方法,通过调用api可以返回一个加了锁的容器。

4、ConcurrentHashMap

    并发的hashmap,这个例子测试一下效率。第一种用hashtable,hashtable所有方法都加了锁了,第二种concurrenthashmap,大致能看出来他的效率要比hashtable要高一些,在多线程的情况下。为什么呢,因为hashtable往里面加任何数据的时候都是要锁定整个对象,而
concurrenthashmap,是分成十六个段,每次插数据的时候,只会锁住一小段,1.8之后实现不同。

public static void main(String[] args) {
//		Map<String, String> map = new ConcurrentHashMap<>();
		Map<String, String> map = new ConcurrentSkipListMap<>();
//		Map<String, String> map = new Hashtable<>();

//		Map<String, String> map = new HashMap<>();
//		Map<String, String> map1 = Collections.synchronizedMap(map);

		Random random = new Random();
		Thread[] threads = new Thread[100];
		CountDownLatch latch = new CountDownLatch(threads.length);
		long start_time = System.currentTimeMillis();
		for (int i = 0; i < threads.length; i++) {
			threads[i] = new Thread(()->{
				for(int j=0; j<10000;j++) {
					map.put("a" + random.nextInt(100000), "a" + random.nextInt(100000));
//					map1.put("a" + random.nextInt(100000), "a" + random.nextInt(100000));
				}
				latch.countDown();
			});
		}
		Arrays.asList(threads).forEach(t->t.start());
		try {
			latch.await();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		long end_time = System.currentTimeMillis();
		System.out.println(end_time-start_time);
	}

5、LinkedBlockingQueue

    阻塞式的容器

	private static BlockingQueue<String> strings = new LinkedBlockingQueue<>(10);

	public static void main(String[] args) {
		new Thread(()->{
			for (int i = 0; i < 100; i++) {
				try {
					// 在阻塞式容器里面加了一个方法,put,也就是如果满了就会等待,对应的方法叫take,如果空了就会等待。
					// 这种容器我们去用的时候自动就实现了阻塞式的生产者消费者。
					strings.put("商品" + i);
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
		}, "producer").start();

		for (int i = 0; i < 5; i++) {
			new Thread(()->{
				for(;;){
					try {
						// take,拿,如果空了也会阻塞
						System.out.println(Thread.currentThread().getName() + " take " + strings.take()); //如果空了,就会等待
					} catch (Exception e) {
						e.printStackTrace();
					} 
				}
			},"consumer" + i).start();
		}

	}

6、ArrayBlockingQueue

    有界队列,意思就是说这个队列能装的元素的个数是固定的,后面讲线程池的时候,里面装的其实是一个个任务。这里只能装10个,如果超过了可能会出问题可能会阻塞,这里看你调用什么方法。

  • add会报异常
  • offer不会报异常,他只通过布尔类型的返回值来告诉你是加成功了还是没有加成功。
  • offer可以设置时间,如果这段时间加不进去就不加了也就是返回false
  • put方法是满了会阻塞住。

	private static BlockingQueue<String> strings = new ArrayBlockingQueue<>(10);
	
	public static void main(String[] args) throws InterruptedException {
		for (int i = 0; i < 10; i++) {
			strings.put("a" + i);
		}
		strings.add("aaaa");
//		strings.put("aaaa");
//		strings.offer("aaaa");
		strings.offer("aaaa",1, TimeUnit.SECONDS);
		System.out.println(strings);
	}

7、DelayQueue

    容器里每一个元素都设置了一个时间,时间到了才能从中提取元素

	private static BlockingQueue<MyTask> tasks = new DelayQueue<>();

	static class MyTask implements Delayed{

		long runningTime;
		
		public MyTask(long rt) {
			this.runningTime = rt;
		}

		@Override
		public int compareTo(Delayed o) {
			if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MICROSECONDS)) {
				return -1;
			}else if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) {
				return 1;
			}else {
				return 0;
			}
		}

		@Override
		public long getDelay(TimeUnit unit) {
			return unit.convert(runningTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
		}
		
		@Override
		public String toString() {
			return "" + runningTime;
		}
		
		public static void main(String[] args) throws InterruptedException {
			long now = System.currentTimeMillis();
			MyTask t1 = new MyTask(now+1000);
			MyTask t2 = new MyTask(now+2000);
			MyTask t3 = new MyTask(now+1500);
			MyTask t4 = new MyTask(now+2500);
			MyTask t5 = new MyTask(now+500);
			
			tasks.put(t1);
			tasks.put(t2);
			tasks.put(t3);
			tasks.put(t4);
			tasks.put(t5);
			
			System.out.println(tasks);
			
			for (int i = 0; i < 5; i++) {
				System.out.println(tasks.take());
			}
		}

	}

8、TransferQueue

     和普通的queue的方法差不多,多了一个transfer方法。如果你用这种队列的话,往往是消费者先启动,生产者生产一个东西的时候,他先是去找消费者,如果有消费者就直接丢给消费者。

	public static void main(String[] args) throws InterruptedException {
		LinkedTransferQueue<String> strings = new LinkedTransferQueue<>();
		
		new Thread(()->{
			try {
				System.out.println("t1"+strings.take());
			} catch (Exception e) {
				e.printStackTrace();
			}
		}).start();

		new Thread(()->{
			try {
				System.out.println("t2"+strings.take());
			} catch (Exception e) {
				e.printStackTrace();
			}
		}).start();

		TimeUnit.SECONDS.sleep(2);

		strings.transfer("aaa");
//		strings.put("aaa");
		System.out.println(strings.size());
//		new Thread(()->{
//			try {
//				System.out.println(strings.take());
//			} catch (Exception e) {
//				e.printStackTrace();
//			}
//		}).start();
	}

9、SynchronizedQueue

    同步队列,同步队列是容量为0,也就是来的东西必须给消费掉,首先启动一个消费者,调用add方法,他报错了只能调用put,意思就是阻塞等待消费者消费。put里面其实用的是transfer,任何东西必须消费,不能往容器里面扔。

	public static void main(String[] args) throws InterruptedException {
		BlockingQueue<String> strings = new SynchronousQueue<>();
		
		new Thread(()->{
			try {
				System.out.println(strings.take());
			} catch (Exception e) {
				e.printStackTrace();
			}
		}).start();
//		strings.add("aaa");
		strings.put("aaa");
		strings.put("aaa");
		strings.put("aaa");
		strings.put("aaa");
		strings.put("aaa");
		System.out.println(strings.size());
	}

标题:java中的并发容器
作者:fyzzz
地址:https://fyzzz.cn/articles/2020/04/29/1588143576298.html