Development of an internal social media platform with personalised dashboards for students
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

sorters.c 15KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542
  1. /*****************************************************************************
  2. Copyright (c) 2002 Zope Foundation and Contributors.
  3. All Rights Reserved.
  4. This software is subject to the provisions of the Zope Public License,
  5. Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
  6. THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
  7. WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
  8. WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
  9. FOR A PARTICULAR PURPOSE
  10. ****************************************************************************/
  11. /* Revision information: $Id$ */
  12. /* The only routine here intended to be used outside the file is
  13. size_t sort_int_nodups(int *p, size_t n)
  14. Sort the array of n ints pointed at by p, in place, and also remove
  15. duplicates. Return the number of unique elements remaining, which occupy
  16. a contiguous and monotonically increasing slice of the array starting at p.
  17. Example: If the input array is [3, 1, 2, 3, 1, 5, 2], sort_int_nodups
  18. returns 4, and the first 4 elements of the array are changed to
  19. [1, 2, 3, 5]. The content of the remaining array positions is not defined.
  20. Notes:
  21. + This is specific to n-byte signed ints, with endianness natural to the
  22. platform. `n` is determined based on ZODB_64BIT_INTS.
  23. + 4*n bytes of available heap memory are required for best speed
  24. (8*n when ZODB_64BIT_INTS is defined).
  25. */
  26. #include <stdlib.h>
  27. #include <stddef.h>
  28. #include <memory.h>
  29. #include <string.h>
  30. #include <assert.h>
  31. /* The type of array elements to be sorted. Most of the routines don't
  32. care about the type, and will work fine for any scalar C type (provided
  33. they're recompiled with element_type appropriately redefined). However,
  34. the radix sort has to know everything about the type's internal
  35. representation.
  36. */
  37. typedef KEY_TYPE element_type;
  38. /* The radixsort is faster than the quicksort for large arrays, but radixsort
  39. has high fixed overhead, making it a poor choice for small arrays. The
  40. crossover point isn't critical, and is sensitive to things like compiler
  41. and machine cache structure, so don't worry much about this.
  42. */
  43. #define QUICKSORT_BEATS_RADIXSORT 800U
  44. /* In turn, the quicksort backs off to an insertion sort for very small
  45. slices. MAX_INSERTION is the largest slice quicksort leaves entirely to
  46. insertion. Because this version of quicksort uses a median-of-3 rule for
  47. selecting a pivot, MAX_INSERTION must be at least 2 (so that quicksort
  48. has at least 3 values to look at in a slice). Again, the exact value here
  49. isn't critical.
  50. */
  51. #define MAX_INSERTION 25U
  52. #if MAX_INSERTION < 2U
  53. # error "MAX_INSERTION must be >= 2"
  54. #endif
  55. /* LSB-first radix sort of the n elements in 'in'.
  56. 'work' is work storage at least as large as 'in'. Depending on how many
  57. swaps are done internally, the final result may come back in 'in' or 'work';
  58. and that pointer is returned.
  59. radixsort_int is specific to signed n-byte ints, with natural machine
  60. endianness. `n` is determined based on ZODB_64BIT_INTS.
  61. */
  62. static element_type*
  63. radixsort_int(element_type *in, element_type *work, size_t n)
  64. {
  65. /* count[i][j] is the number of input elements that have byte value j
  66. in byte position i, where byte position 0 is the LSB. Note that
  67. holding i fixed, the sum of count[i][j] over all j in range(256)
  68. is n.
  69. */
  70. #ifdef ZODB_64BIT_INTS
  71. size_t count[8][256];
  72. #else
  73. size_t count[4][256];
  74. #endif
  75. size_t i;
  76. int offset, offsetinc;
  77. /* Which byte position are we working on now? 0=LSB, 1, 2, ... */
  78. size_t bytenum;
  79. #ifdef ZODB_64BIT_INTS
  80. assert(sizeof(element_type) == 8);
  81. #else
  82. assert(sizeof(element_type) == 4);
  83. #endif
  84. assert(in);
  85. assert(work);
  86. /* Compute all of count in one pass. */
  87. memset(count, 0, sizeof(count));
  88. for (i = 0; i < n; ++i) {
  89. element_type const x = in[i];
  90. ++count[0][(x ) & 0xff];
  91. ++count[1][(x >> 8) & 0xff];
  92. ++count[2][(x >> 16) & 0xff];
  93. ++count[3][(x >> 24) & 0xff];
  94. #ifdef ZODB_64BIT_INTS
  95. ++count[4][(x >> 32) & 0xff];
  96. ++count[5][(x >> 40) & 0xff];
  97. ++count[6][(x >> 48) & 0xff];
  98. ++count[7][(x >> 56) & 0xff];
  99. #endif
  100. }
  101. /* For p an element_type* cast to char*, offset is how much farther we
  102. have to go to get to the LSB of the element; this is 0 for little-
  103. endian boxes and sizeof(element_type)-1 for big-endian.
  104. offsetinc is 1 or -1, respectively, telling us which direction to go
  105. from p+offset to get to the element's more-significant bytes.
  106. */
  107. {
  108. element_type one = 1;
  109. if (*(char*)&one) {
  110. /* Little endian. */
  111. offset = 0;
  112. offsetinc = 1;
  113. }
  114. else {
  115. /* Big endian. */
  116. offset = sizeof(element_type) - 1;
  117. offsetinc = -1;
  118. }
  119. }
  120. /* The radix sort. */
  121. for (bytenum = 0;
  122. bytenum < sizeof(element_type);
  123. ++bytenum, offset += offsetinc) {
  124. /* Do a stable distribution sort on byte position bytenum,
  125. from in to work. index[i] tells us the work index at which
  126. to store the next in element with byte value i. pinbyte
  127. points to the correct byte in the input array.
  128. */
  129. size_t index[256];
  130. unsigned char* pinbyte;
  131. size_t total = 0;
  132. size_t *pcount = count[bytenum];
  133. /* Compute the correct output starting index for each possible
  134. byte value.
  135. */
  136. if (bytenum < sizeof(element_type) - 1) {
  137. for (i = 0; i < 256; ++i) {
  138. const size_t icount = pcount[i];
  139. index[i] = total;
  140. total += icount;
  141. if (icount == n)
  142. break;
  143. }
  144. if (i < 256) {
  145. /* All bytes in the current position have value
  146. i, so there's nothing to do on this pass.
  147. */
  148. continue;
  149. }
  150. }
  151. else {
  152. /* The MSB of signed ints needs to be distributed
  153. differently than the other bytes, in order
  154. 0x80, 0x81, ... 0xff, 0x00, 0x01, ... 0x7f
  155. */
  156. for (i = 128; i < 256; ++i) {
  157. const size_t icount = pcount[i];
  158. index[i] = total;
  159. total += icount;
  160. if (icount == n)
  161. break;
  162. }
  163. if (i < 256)
  164. continue;
  165. for (i = 0; i < 128; ++i) {
  166. const size_t icount = pcount[i];
  167. index[i] = total;
  168. total += icount;
  169. if (icount == n)
  170. break;
  171. }
  172. if (i < 128)
  173. continue;
  174. }
  175. assert(total == n);
  176. /* Distribute the elements according to byte value. Note that
  177. this is where most of the time is spent.
  178. Note: The loop is unrolled 4x by hand, for speed. This
  179. may be a pessimization someday, but was a significant win
  180. on my MSVC 6.0 timing tests.
  181. */
  182. pinbyte = (unsigned char *)in + offset;
  183. i = 0;
  184. /* Reduce number of elements to copy to a multiple of 4. */
  185. while ((n - i) & 0x3) {
  186. unsigned char byte = *pinbyte;
  187. work[index[byte]++] = in[i];
  188. ++i;
  189. pinbyte += sizeof(element_type);
  190. }
  191. for (; i < n; i += 4, pinbyte += 4 * sizeof(element_type)) {
  192. unsigned char byte1 = *(pinbyte );
  193. unsigned char byte2 = *(pinbyte + sizeof(element_type));
  194. unsigned char byte3 = *(pinbyte + 2 * sizeof(element_type));
  195. unsigned char byte4 = *(pinbyte + 3 * sizeof(element_type));
  196. element_type in1 = in[i ];
  197. element_type in2 = in[i+1];
  198. element_type in3 = in[i+2];
  199. element_type in4 = in[i+3];
  200. work[index[byte1]++] = in1;
  201. work[index[byte2]++] = in2;
  202. work[index[byte3]++] = in3;
  203. work[index[byte4]++] = in4;
  204. }
  205. /* Swap in and work (just a pointer swap). */
  206. {
  207. element_type *temp = in;
  208. in = work;
  209. work = temp;
  210. }
  211. }
  212. return in;
  213. }
  214. /* Remove duplicates from sorted array in, storing exactly one of each distinct
  215. element value into sorted array out. It's OK (and expected!) for in == out,
  216. but otherwise the n elements beginning at in must not overlap with the n
  217. beginning at out.
  218. Return the number of elements in out.
  219. */
  220. static size_t
  221. uniq(element_type *out, element_type *in, size_t n)
  222. {
  223. size_t i;
  224. element_type lastelt;
  225. element_type *pout;
  226. assert(out);
  227. assert(in);
  228. if (n == 0)
  229. return 0;
  230. /* i <- first index in 'in' that contains a duplicate.
  231. in[0], in[1], ... in[i-1] are unique, but in[i-1] == in[i].
  232. Set i to n if everything is unique.
  233. */
  234. for (i = 1; i < n; ++i) {
  235. if (in[i-1] == in[i])
  236. break;
  237. }
  238. /* in[:i] is unique; copy to out[:i] if needed. */
  239. assert(i > 0);
  240. if (in != out)
  241. memcpy(out, in, i * sizeof(element_type));
  242. pout = out + i;
  243. lastelt = in[i-1]; /* safe even when i == n */
  244. for (++i; i < n; ++i) {
  245. element_type elt = in[i];
  246. if (elt != lastelt)
  247. *pout++ = lastelt = elt;
  248. }
  249. return pout - out;
  250. }
  251. #if 0
  252. /* insertionsort is no longer referenced directly, but I'd like to keep
  253. * the code here just in case.
  254. */
  255. /* Straight insertion sort of the n elements starting at 'in'. */
  256. static void
  257. insertionsort(element_type *in, size_t n)
  258. {
  259. element_type *p, *q;
  260. element_type minimum; /* smallest seen so far */
  261. element_type *plimit = in + n;
  262. assert(in);
  263. if (n < 2)
  264. return;
  265. minimum = *in;
  266. for (p = in+1; p < plimit; ++p) {
  267. /* *in <= *(in+1) <= ... <= *(p-1). Slide *p into place. */
  268. element_type thiselt = *p;
  269. if (thiselt < minimum) {
  270. /* This is a new minimum. This saves p-in compares
  271. when it happens, but should happen so rarely that
  272. it's not worth checking for its own sake: the
  273. point is that the far more popular 'else' branch can
  274. exploit that thiselt is *not* the smallest so far.
  275. */
  276. memmove(in+1, in, (p - in) * sizeof(*in));
  277. *in = minimum = thiselt;
  278. }
  279. else {
  280. /* thiselt >= minimum, so the loop will find a q
  281. with *q <= thiselt. This saves testing q >= in
  282. on each trip. It's such a simple loop that saving
  283. a per-trip test is a major speed win.
  284. */
  285. for (q = p-1; *q > thiselt; --q)
  286. *(q+1) = *q;
  287. *(q+1) = thiselt;
  288. }
  289. }
  290. }
  291. #endif
  292. /* The maximum number of elements in the pending-work stack quicksort
  293. maintains. The maximum stack depth is approximately log2(n), so
  294. arrays of size up to approximately MAX_INSERTION * 2**STACKSIZE can be
  295. sorted. The memory burden for the stack is small, so better safe than
  296. sorry.
  297. */
  298. #define STACKSIZE 60
  299. /* A _stacknode remembers a contiguous slice of an array that needs to sorted.
  300. lo must be <= hi, and, unlike Python array slices, this includes both ends.
  301. */
  302. struct _stacknode {
  303. element_type *lo;
  304. element_type *hi;
  305. };
  306. static void
  307. quicksort(element_type *plo, size_t n)
  308. {
  309. element_type *phi;
  310. /* Swap two array elements. */
  311. element_type _temp;
  312. #define SWAP(P, Q) (_temp = *(P), *(P) = *(Q), *(Q) = _temp)
  313. /* Stack of pending array slices to be sorted. */
  314. struct _stacknode stack[STACKSIZE];
  315. struct _stacknode *stackfree = stack; /* available stack slot */
  316. /* Push an array slice on the pending-work stack. */
  317. #define PUSH(PLO, PHI) \
  318. do { \
  319. assert(stackfree - stack < STACKSIZE); \
  320. assert((PLO) <= (PHI)); \
  321. stackfree->lo = (PLO); \
  322. stackfree->hi = (PHI); \
  323. ++stackfree; \
  324. } while(0)
  325. assert(plo);
  326. phi = plo + n - 1;
  327. for (;;) {
  328. element_type pivot;
  329. element_type *pi, *pj;
  330. assert(plo <= phi);
  331. n = phi - plo + 1;
  332. if (n <= MAX_INSERTION) {
  333. /* Do a small insertion sort. Contra Knuth, we do
  334. this now instead of waiting until the end, because
  335. this little slice is likely still in cache now.
  336. */
  337. element_type *p, *q;
  338. element_type minimum = *plo;
  339. for (p = plo+1; p <= phi; ++p) {
  340. /* *plo <= *(plo+1) <= ... <= *(p-1).
  341. Slide *p into place. */
  342. element_type thiselt = *p;
  343. if (thiselt < minimum) {
  344. /* New minimum. */
  345. memmove(plo+1,
  346. plo,
  347. (p - plo) * sizeof(*p));
  348. *plo = minimum = thiselt;
  349. }
  350. else {
  351. /* thiselt >= minimum, so the loop will
  352. find a q with *q <= thiselt.
  353. */
  354. for (q = p-1; *q > thiselt; --q)
  355. *(q+1) = *q;
  356. *(q+1) = thiselt;
  357. }
  358. }
  359. /* Pop another slice off the stack. */
  360. if (stack == stackfree)
  361. break; /* no more slices -- we're done */
  362. --stackfree;
  363. plo = stackfree->lo;
  364. phi = stackfree->hi;
  365. continue;
  366. }
  367. /* Parition the slice.
  368. For pivot, take the median of the leftmost, rightmost, and
  369. middle elements. First sort those three; then the median
  370. is the middle one. For technical reasons, the middle
  371. element is swapped to plo+1 first (see Knuth Vol 3 Ed 2
  372. section 5.2.2 exercise 55 -- reverse-sorted arrays can
  373. take quadratic time otherwise!).
  374. */
  375. {
  376. element_type *plop1 = plo + 1;
  377. element_type *pmid = plo + (n >> 1);
  378. assert(plo < pmid && pmid < phi);
  379. SWAP(plop1, pmid);
  380. /* Sort plo, plop1, phi. */
  381. /* Smaller of rightmost two -> middle. */
  382. if (*plop1 > *phi)
  383. SWAP(plop1, phi);
  384. /* Smallest of all -> left; if plo is already the
  385. smallest, the sort is complete.
  386. */
  387. if (*plo > *plop1) {
  388. SWAP(plo, plop1);
  389. /* Largest of all -> right. */
  390. if (*plop1 > *phi)
  391. SWAP(plop1, phi);
  392. }
  393. pivot = *plop1;
  394. pi = plop1;
  395. }
  396. assert(*plo <= pivot);
  397. assert(*pi == pivot);
  398. assert(*phi >= pivot);
  399. pj = phi;
  400. /* Partition wrt pivot. This is the time-critical part, and
  401. nearly every decision in the routine aims at making this
  402. loop as fast as possible -- even small points like
  403. arranging that all loop tests can be done correctly at the
  404. bottoms of loops instead of the tops, and that pointers can
  405. be derefenced directly as-is (without fiddly +1 or -1).
  406. The aim is to make the C here so simple that a compiler
  407. has a good shot at doing as well as hand-crafted assembler.
  408. */
  409. for (;;) {
  410. /* Invariants:
  411. 1. pi < pj.
  412. 2. All elements at plo, plo+1 .. pi are <= pivot.
  413. 3. All elements at pj, pj+1 .. phi are >= pivot.
  414. 4. There is an element >= pivot to the right of pi.
  415. 5. There is an element <= pivot to the left of pj.
  416. Note that #4 and #5 save us from needing to check
  417. that the pointers stay in bounds.
  418. */
  419. assert(pi < pj);
  420. do { ++pi; } while (*pi < pivot);
  421. assert(pi <= pj);
  422. do { --pj; } while (*pj > pivot);
  423. assert(pj >= pi - 1);
  424. if (pi < pj)
  425. SWAP(pi, pj);
  426. else
  427. break;
  428. }
  429. assert(plo+1 < pi && pi <= phi);
  430. assert(plo < pj && pj < phi);
  431. assert(*pi >= pivot);
  432. assert( (pi == pj && *pj == pivot) ||
  433. (pj + 1 == pi && *pj <= pivot) );
  434. /* Swap pivot into its final position, pj. */
  435. assert(plo[1] == pivot);
  436. plo[1] = *pj;
  437. *pj = pivot;
  438. /* Subfiles are from plo to pj-1 inclusive, and pj+1 to phi
  439. inclusive. Push the larger one, and loop back to do the
  440. smaller one directly.
  441. */
  442. if (pj - plo >= phi - pj) {
  443. PUSH(plo, pj-1);
  444. plo = pj+1;
  445. }
  446. else {
  447. PUSH(pj+1, phi);
  448. phi = pj-1;
  449. }
  450. }
  451. #undef PUSH
  452. #undef SWAP
  453. }
  454. /* Sort p and remove duplicates, as fast as we can. */
  455. static size_t
  456. sort_int_nodups(KEY_TYPE *p, size_t n)
  457. {
  458. size_t nunique;
  459. element_type *work;
  460. assert(sizeof(KEY_TYPE) == sizeof(element_type));
  461. assert(p);
  462. /* Use quicksort if the array is small, OR if malloc can't find
  463. enough temp memory for radixsort.
  464. */
  465. work = NULL;
  466. if (n > QUICKSORT_BEATS_RADIXSORT)
  467. work = (element_type *)malloc(n * sizeof(element_type));
  468. if (work) {
  469. element_type *out = radixsort_int(p, work, n);
  470. nunique = uniq(p, out, n);
  471. free(work);
  472. }
  473. else {
  474. quicksort(p, n);
  475. nunique = uniq(p, p, n);
  476. }
  477. return nunique;
  478. }