快排源码分析

以Arrays.sort()为例进行分析。

快排基本原理

简介

快速排序,是一种比较/交换排序算法,最早由英国的东尼·霍尔提出(另外一个知名贡献是提出了和操作系统相关的哲学家进餐问题,首次提出了管程的概念,实现共享资源的互斥访问)。

时间复杂度:快排的平均时间复杂度是O(nlogn),最坏时间复杂度是O(n^2)

基本思想:使用分治法将一个序列分为两个子序列,然后递归地排序两个子序列。

步骤:

  • 挑选基准值:从序列中选出一个元素,作为基准(pivot)
  • 分割:所有比基准值小的元素放在基准前面,所有比基准值大的元素放在基准后面(与基准值相等的可以放到任何一边)
  • 递归排序子序列:递归地将小于基准值元素的子序列和大于基准值的子序列进行排序

递归/非递归实现

递归实现

1
2
3
4
5
6
7
8
9
10
11
12
13
void quicksort(int[] arr,int left,int right){
if(left >= right) return;
int tmp = arr[left],l = left,r = right;
while(l<r){
while(l<r && tmp<=arr[r]) r--;
arr[l] = arr[r];
while(l<r && tmp>=arr[l]) l++;
arr[r] = arr[l];
}
arr[l] = tmp;
quicksort(arr,left,l-1);
quicksort(arr,l+1,right);
}

非递归实现

非递归借用栈实现,栈中存放的是待排序区间的左右坐标。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
void quickSort(int[] arr,int left,int right) {
Deque<Integer> s = new ArrayDeque<>();
s.push(left);
s.push(right);
while(!s.isEmpty()) {
int rindex = s.poll(),lindex = s.poll();
int l = lindex,r = rindex;
int tmp = arr[l];
while(l<r&&arr[r]>=tmp) r--;
arr[l] = arr[r];
while(l<r&&arr[l]<=tmp) l++;
arr[r] = arr[l];
arr[l] = tmp;
if(l-1>lindex) {
s.push(lindex);
s.push(l-1);
}
if(l+1<rindex) {
s.push(l+1);
s.push(rindex);
}
}
}

快排实现稳定排序

快排是一种不稳定的排序,因为每次交换的过程中可能导致相等值的前后顺序改变。

为了实现稳定排序,可以使用一个辅助数组,从前往后把小于基准数的放进辅助数组,再把基准数放进去,然后再遍历一遍把大于等于基准数的放进辅助数组,避免交换操作就可以稳定了。

这种增加额外空间的方式虽然可以实现稳定排序,但是已经不是严格意义的快速排序了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void quicksort(int[] arr,int left,int right){
if(left>=right) return;
int len = right-left+1;
int[] tmp = new int[len];
int target = arr[left];
int cnt = 0,mid = 0;
for(int i=left+1;i<=right;i++){
if(arr[i]<target) tmp[cnt++] = arr[i];
}
tmp[cnt++] = target;
mid = left+cnt-1;
for(int i=left+1;i<=right;i++){
if(arr[i]>=target) tmp[cnt++] = arr[i];
}
for(int i=left;i<=right;i++){
arr[i] = tmp[i-left];
}
quicksort(arr,left,mid-1);
quicksort(arr,mid+1,right);
}

时间复杂度推导

快排涉及到递归调用,递归调用的通用时间复杂度公式是T(n) = aT(n/b) + f(n),其中a,b是常数,a可以理解为每次分割成几部分,n/b可以理解为子部分分割后的大小,f(n)是每次递归调用需要额外的计算执行时间。

对于快速排序,每次分割成两部分(a=2),假设每次近似平均分成两半(b=2),每次递归中用于交换的额外执行时间是线性的(f(n)),于是有:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
T(n) = 2T(n/2) + n
= 2(2T(n/4) + n/2) + n
= 2^2T(n/2^2) + 2n
= ...

令2^m = n,则有

T(n) = 2^mT(1) + mn

因为T(1) = 1,m = logn

所以T(n) = n + nlogn

当n >= 2时,nlogn >= n

所以时间复杂度是O(nlogn)

以上是一个最优时间复杂度的推导,即假设每次平均分成了相等长度的两半,更一般的推导参考链接,这里给出精确计算结果(C(n) = 2nlnn = 1.39nlog2(n))

Arrays.sort()分析

基本思想

Java中的Arrays.sort()是一种混合排序,采用了插入排序、双插入排序、单轴快排、双轴快排、归并排序的思想,针对不同长度数组、重复元素进行了特殊优化。

算法流程:

源码分析

参数解释

进入Arrays.sort()可以看到调用了DualPivotQuicksort类内部的静态方法sort。

1
2
3
4
5
6
7
8
9
10
11
/**
* 参数int[]代表待排序数组
* 参数left表示起始元素下标
* 参数right表示终止元素下标
* 参数work表示切片
* 参数workBase表示切片中可用空间的起始索引
* 参数workLen表示切片中可用空间大小
*/
public static void sort(int[] a) {
DualPivotQuicksort.sort(a, 0, a.length - 1, null, 0, 0);
}

DualPivotQuicksort类中的一些参数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* 归并排序中最大运行数量
*/
private static final int MAX_RUN_COUNT = 67;

/**
* 归并排序中的数组最大长度
*/
private static final int MAX_RUN_LENGTH = 33;

/**
* 快排的阈值,如果小于该长度,则采用快排,否则采用归并排序
*/
private static final int QUICKSORT_THRESHOLD = 286;

/**
* 插入排序的阈值,如果小于该长度,则采用插入排序
*/
private static final int INSERTION_SORT_THRESHOLD = 47;

/**
* 计数排序的阈值,如果一个字节数组的长度小于该长度,则采用计数排序
*/
private static final int COUNTING_SORT_THRESHOLD_FOR_BYTE = 29;

/**
* 如果短数组或字符数组的长度大于该长度,则采用计数排序
*/
private static final int COUNTING_SORT_THRESHOLD_FOR_SHORT_OR_CHAR = 3200;

大数组归并排序

静态内部类sort,长度小于286采用快排,第四个参数代表是否在区间最左侧

1
2
3
4
5
// Use Quicksort on small arrays
if (right - left < QUICKSORT_THRESHOLD) {
sort(a, left, right, true);
return;
}

对于长度大于等于286的,检查数组是否接近有序,如果接近有序,采用归并排序,否则采用快排。(是否接近有序的判别标准是MAX_RUN_COUNT和MAX_RUN_LENGTH两个参数)

使用归并排序的条件:

  • 数组长度
  • 数组局部有序
  • 不存在较多连续相等的元素

使用run记录有序子序列的坐标范围,如run[0]==0 && run[1]==3表示第一段有序序列的其实索引是0,终止索引是2,count记录有序子序列的个数,如果count值过大,表示平均有序子序列较短,数组基本无序,此时应该采用快排

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
int[] run = new int[MAX_RUN_COUNT + 1];
int count = 0; run[0] = left;

// Check if the array is nearly sorted
for (int k = left; k < right; run[count] = k) {
if (a[k] < a[k + 1]) { // ascending
while (++k <= right && a[k - 1] <= a[k]);
} else if (a[k] > a[k + 1]) { // descending
while (++k <= right && a[k - 1] >= a[k]);
for (int lo = run[count] - 1, hi = k; ++lo < --hi; ) {
int t = a[lo]; a[lo] = a[hi]; a[hi] = t;
}
} else { // equal
for (int m = MAX_RUN_LENGTH; ++k <= right && a[k - 1] == a[k]; ) {
if (--m == 0) {
sort(a, left, right, true);
return;
}
}
}

/*
* The array is not highly structured,
* use Quicksort instead of merge sort.
*/
if (++count == MAX_RUN_COUNT) {
sort(a, left, right, true);
return;
}
}

小数组插入排序

双轴快排,进入快速排序的sort:

如果长度小于47,使用插入排序。如果属于最左区间,采用普通的插入排序;如果不属于最左区间,采用双插入排序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
// Use insertion sort on tiny arrays
if (length < INSERTION_SORT_THRESHOLD) {
if (leftmost) {
/*
* Traditional (without sentinel) insertion sort,
* optimized for server VM, is used in case of
* the leftmost part.
*/
for (int i = left, j = i; i < right; j = ++i) {
int ai = a[i + 1];
while (ai < a[j]) {
a[j + 1] = a[j];
if (j-- == left) {
break;
}
}
a[j + 1] = ai;
}
} else {
/*
* Skip the longest ascending sequence.
*/
do {
if (left >= right) {
return;
}
} while (a[++left] >= a[left - 1]);

/*
* Every element from adjoining part plays the role
* of sentinel, therefore this allows us to avoid the
* left range check on each iteration. Moreover, we use
* the more optimized algorithm, so called pair insertion
* sort, which is faster (in the context of Quicksort)
* than traditional implementation of insertion sort.
*/
for (int k = left; ++left <= right; k = ++left) {
int a1 = a[k], a2 = a[left];

if (a1 < a2) {
a2 = a1; a1 = a[left];
}
while (a1 < a[--k]) {
a[k + 2] = a[k];
}
a[++k + 1] = a1;

while (a2 < a[--k]) {
a[k + 1] = a[k];
}
a[k + 1] = a2;
}
int last = a[right];

while (last < a[--right]) {
a[right + 1] = a[right];
}
a[right + 1] = last;
}
return;
}

双轴快排

选取基准

首先计算近似长度的1/7,然后选取五个经验点/中心点,对五个经验点进行插入排序。

如果五个经验点/中心点各不相等,则 从中选择两个(第二个和第四个)作为基准。如果五个经验点/中心点其中有两个相等,则采用传统的单轴快排(一个基准)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// Inexpensive approximation of length / 7
int seventh = (length >> 3) + (length >> 6) + 1;

/*
* Sort five evenly spaced elements around (and including) the
* center element in the range. These elements will be used for
* pivot selection as described below. The choice for spacing
* these elements was empirically determined to work well on
* a wide variety of inputs.
*/
int e3 = (left + right) >>> 1; // The midpoint
int e2 = e3 - seventh;
int e1 = e2 - seventh;
int e4 = e3 + seventh;
int e5 = e4 + seventh;

// Sort these elements using insertion sort
if (a[e2] < a[e1]) { int t = a[e2]; a[e2] = a[e1]; a[e1] = t; }

if (a[e3] < a[e2]) { int t = a[e3]; a[e3] = a[e2]; a[e2] = t;
if (t < a[e1]) { a[e2] = a[e1]; a[e1] = t; }
}
if (a[e4] < a[e3]) { int t = a[e4]; a[e4] = a[e3]; a[e3] = t;
if (t < a[e2]) { a[e3] = a[e2]; a[e2] = t;
if (t < a[e1]) { a[e2] = a[e1]; a[e1] = t; }
}
}
if (a[e5] < a[e4]) { int t = a[e5]; a[e5] = a[e4]; a[e4] = t;
if (t < a[e3]) { a[e4] = a[e3]; a[e3] = t;
if (t < a[e2]) { a[e3] = a[e2]; a[e2] = t;
if (t < a[e1]) { a[e2] = a[e1]; a[e1] = t; }
}
}
}

定义两个指针,less,great,less从左到右遍历找到第一个大于等于pivot1的元素,great从右向左遍历找到第一个小于等于pivot2的元素。k从less-1遍历到great,把小于pivot1的元素放到左边,把大于pivot2的元素放到右边。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
/*
* Skip elements, which are less or greater than pivot values.
*/
while (a[++less] < pivot1);
while (a[--great] > pivot2);

/*
* Partitioning:
*
* left part center part right part
* +--------------------------------------------------------------+
* | < pivot1 | pivot1 <= && <= pivot2 | ? | > pivot2 |
* +--------------------------------------------------------------+
* ^ ^ ^
* | | |
* less k great
*
* Invariants:
*
* all in (left, less) < pivot1
* pivot1 <= all in [less, k) <= pivot2
* all in (great, right) > pivot2
*
* Pointer k is the first index of ?-part.
*/
outer:
for (int k = less - 1; ++k <= great; ) {
int ak = a[k];
if (ak < pivot1) { // Move a[k] to left part
a[k] = a[less];
/*
* Here and below we use "a[i] = b; i++;" instead
* of "a[i++] = b;" due to performance issue.
*/
a[less] = ak;
++less;
} else if (ak > pivot2) { // Move a[k] to right part
while (a[great] > pivot2) {
if (great-- == k) {
break outer;
}
}
if (a[great] < pivot1) { // a[great] <= pivot2
a[k] = a[less];
a[less] = a[great];
++less;
} else { // pivot1 <= a[great] <= pivot2
a[k] = a[great];
}
/*
* Here and below we use "a[i] = b; i--;" instead
* of "a[i--] = b;" due to performance issue.
*/
a[great] = ak;
--great;
}
}

交换基准,递归左边和右边

1
2
3
4
5
6
7
// Swap pivots into their final positions
a[left] = a[less - 1]; a[less - 1] = pivot1;
a[right] = a[great + 1]; a[great + 1] = pivot2;

// Sort left and right parts recursively, excluding known pivots
sort(a, left, less - 2, leftmost);
sort(a, great + 2, right, false);

对于中间部分 ,如果长度超过4/7,需要进行预处理,把等于pivot1的放左边,把等于pivot2的放右边,然后再参与快排。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
/*
* If center part is too large (comprises > 4/7 of the array),
* swap internal pivot values to ends.
*/
if (less < e1 && e5 < great) {
/*
* Skip elements, which are equal to pivot values.
*/
while (a[less] == pivot1) {
++less;
}

while (a[great] == pivot2) {
--great;
}

/*
* Partitioning:
*
* left part center part right part
* +----------------------------------------------------------+
* | == pivot1 | pivot1 < && < pivot2 | ? | == pivot2 |
* +----------------------------------------------------------+
* ^ ^ ^
* | | |
* less k great
*
* Invariants:
*
* all in (*, less) == pivot1
* pivot1 < all in [less, k) < pivot2
* all in (great, *) == pivot2
*
* Pointer k is the first index of ?-part.
*/
outer:
for (int k = less - 1; ++k <= great; ) {
int ak = a[k];
if (ak == pivot1) { // Move a[k] to left part
a[k] = a[less];
a[less] = ak;
++less;
} else if (ak == pivot2) { // Move a[k] to right part
while (a[great] == pivot2) {
if (great-- == k) {
break outer;
}
}
if (a[great] == pivot1) { // a[great] < pivot2
a[k] = a[less];
/*
* Even though a[great] equals to pivot1, the
* assignment a[less] = pivot1 may be incorrect,
* if a[great] and pivot1 are floating-point zeros
* of different signs. Therefore in float and
* double sorting methods we have to use more
* accurate assignment a[less] = a[great].
*/
a[less] = pivot1;
++less;
} else { // pivot1 < a[great] < pivot2
a[k] = a[great];
}
a[great] = ak;
--great;
}
}
}

// Sort center part recursively
sort(a, less, great, false);

优化点总结

  • 小数组的优化:对于小数组来说,插入排序的效率更高,当前数组长度小于47时,使用插入排序代替快排,提升性能
  • 特殊大数组的优化:由于计数排序特别适合待排序数组很长,类型本身可表示的值相对较少的情况,如byte、char数组 ,对于这样的大数组采用计数排序可以提升性能
  • 双轴快排:使用两个基准,分成三段,在没有明显增加比较次数的情况下减少了递归的次数。
  • 针对重复数据优化,避免了不必要的交换和递归

性能测试

不同数据量测试

构造大小为1k,5k,1w,5w,10w的数组分别对Arrays.sort()、快排递归、快排非递归进行测试。

Arrays.sort() 快排递归 快排非递归
1k 1ms 2ms 2ms
5k 1ms 5ms 7ms
1w 2ms - 26ms
5w 7ms - 557ms
10w 9ms - 2182ms

其中快排递归在数据量达到1w的时候idea报错StackOverflowError,栈溢出。

测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
int len = 100000;
int[] arr = new int[len];
Random rd = new Random();
for(int i=0;i<len;i++) {
arr[i] = rd.nextInt(1000000);
}
long start1 = System.currentTimeMillis();
//Arrays.sort()排序
Arrays.sort(arr);
long end1 = System.currentTimeMillis();

long start2 = System.currentTimeMillis();
//快排递归
quickSort1(arr,0,arr.length-1);
long end2 = System.currentTimeMillis();

long start3 = System.currentTimeMillis();
//快排非递归
quickSort2(arr,0,arr.length-1);
long end3 = System.currentTimeMillis();

System.out.println("算法1(Arrays.sort):");
System.out.println(end1-start1);
System.out.println("算法2(快排递归):");
System.out.println(end2-start2);
System.out.println("算法3(快排非递归):");
System.out.println(end3-start3);

顺序/逆序/乱序测试

数据量5k:

Arrays.sort() 快排递归 快排非递归
顺序 0ms 5ms 8ms
逆序 0ms 5ms 7ms
乱序 1ms 5ms 7ms

从表中看不出差别。

数据量10w:

Arrays.sort() 快排非递归
顺序 1ms 2135ms
逆序 2ms 2121ms
乱序 10ms 2178ms

从顺序、逆序来看也没有大的差别,但Arrays.sort()明显要优于原始的快排。

参考

[1] 快速排序,维基百科,https://zh.m.wikipedia.org/zh/%E5%BF%AB%E9%80%9F%E6%8E%92%E5%BA%8F

[2] 快排时间复杂度推导,http://t.zoukankan.com/javawebsoa-p-3194015.html

[3] jdk1.8源码

[4] Why Is Dual-Pivot Quicksort Fast(为什么双轴排序更快),https://arxiv.org/pdf/1511.01138.pdf