aboutsummaryrefslogtreecommitdiff
path: root/src/include
diff options
context:
space:
mode:
authorJohn Naylor <john.naylor@postgresql.org>2022-08-20 21:14:01 -0700
committerJohn Naylor <john.naylor@postgresql.org>2022-08-26 14:03:39 +0700
commite813e0e16852c080259cd0813e1a82ecb2625aea (patch)
treea3c20ae4fbf3eeac56ff1b66e656a25e142ed547 /src/include
parentbcc8b14ef630b2ad9aae7813981fb248fbff9ed8 (diff)
downloadpostgresql-e813e0e16852c080259cd0813e1a82ecb2625aea.tar.gz
postgresql-e813e0e16852c080259cd0813e1a82ecb2625aea.zip
Add optimized functions for linear search within byte arrays
In similar vein to b6ef167564, add pg_lfind8() and pg_lfind8_le() to search for bytes equal or less-than-or-equal to a given byte, respectively. To abstract away platform details, add helper functions and typedefs to simd.h. John Naylor and Nathan Bossart, per suggestion from Andres Freund Discussion: https://www.postgresql.org/message-id/CAFBsxsGzaaGLF%3DNuq61iRXTyspbO9rOjhSqFN%3DV6ozzmta5mXg%40mail.gmail.com
Diffstat (limited to 'src/include')
-rw-r--r--src/include/port/pg_lfind.h68
-rw-r--r--src/include/port/simd.h168
2 files changed, 233 insertions, 3 deletions
diff --git a/src/include/port/pg_lfind.h b/src/include/port/pg_lfind.h
index fb125977b2e..a4e13dffec0 100644
--- a/src/include/port/pg_lfind.h
+++ b/src/include/port/pg_lfind.h
@@ -1,7 +1,8 @@
/*-------------------------------------------------------------------------
*
* pg_lfind.h
- * Optimized linear search routines.
+ * Optimized linear search routines using SIMD intrinsics where
+ * available.
*
* Copyright (c) 2022, PostgreSQL Global Development Group
*
@@ -16,6 +17,70 @@
#include "port/simd.h"
/*
+ * pg_lfind8
+ *
+ * Return true if there is an element in 'base' that equals 'key', otherwise
+ * return false.
+ */
+static inline bool
+pg_lfind8(uint8 key, uint8 *base, uint32 nelem)
+{
+ uint32 i;
+
+ /* round down to multiple of vector length */
+ uint32 tail_idx = nelem & ~(sizeof(Vector8) - 1);
+ Vector8 chunk;
+
+ for (i = 0; i < tail_idx; i += sizeof(Vector8))
+ {
+ vector8_load(&chunk, &base[i]);
+ if (vector8_has(chunk, key))
+ return true;
+ }
+
+ /* Process the remaining elements one at a time. */
+ for (; i < nelem; i++)
+ {
+ if (key == base[i])
+ return true;
+ }
+
+ return false;
+}
+
+/*
+ * pg_lfind8_le
+ *
+ * Return true if there is an element in 'base' that is less than or equal to
+ * 'key', otherwise return false.
+ */
+static inline bool
+pg_lfind8_le(uint8 key, uint8 *base, uint32 nelem)
+{
+ uint32 i;
+
+ /* round down to multiple of vector length */
+ uint32 tail_idx = nelem & ~(sizeof(Vector8) - 1);
+ Vector8 chunk;
+
+ for (i = 0; i < tail_idx; i += sizeof(Vector8))
+ {
+ vector8_load(&chunk, &base[i]);
+ if (vector8_has_le(chunk, key))
+ return true;
+ }
+
+ /* Process the remaining elements one at a time. */
+ for (; i < nelem; i++)
+ {
+ if (base[i] <= key)
+ return true;
+ }
+
+ return false;
+}
+
+/*
* pg_lfind32
*
* Return true if there is an element in 'base' that equals 'key', otherwise
@@ -26,7 +91,6 @@ pg_lfind32(uint32 key, uint32 *base, uint32 nelem)
{
uint32 i = 0;
- /* Use SIMD intrinsics where available. */
#ifdef USE_SSE2
/*
diff --git a/src/include/port/simd.h b/src/include/port/simd.h
index a571e79f574..61e4362258b 100644
--- a/src/include/port/simd.h
+++ b/src/include/port/simd.h
@@ -8,11 +8,17 @@
*
* src/include/port/simd.h
*
+ * NOTES
+ * - VectorN in this file refers to a register where the element operands
+ * are N bits wide. The vector width is platform-specific, so users that care
+ * about that will need to inspect "sizeof(VectorN)".
+ *
*-------------------------------------------------------------------------
*/
#ifndef SIMD_H
#define SIMD_H
+#if (defined(__x86_64__) || defined(_M_AMD64))
/*
* SSE2 instructions are part of the spec for the 64-bit x86 ISA. We assume
* that compilers targeting this architecture understand SSE2 intrinsics.
@@ -22,9 +28,169 @@
* will allow the use of intrinsics that haven't been enabled at compile
* time.
*/
-#if (defined(__x86_64__) || defined(_M_AMD64))
#include <emmintrin.h>
#define USE_SSE2
+typedef __m128i Vector8;
+
+#else
+/*
+ * If no SIMD instructions are available, we can in some cases emulate vector
+ * operations using bitwise operations on unsigned integers.
+ */
+#define USE_NO_SIMD
+typedef uint64 Vector8;
+#endif
+
+
+/* load/store operations */
+static inline void vector8_load(Vector8 *v, const uint8 *s);
+
+/* assignment operations */
+static inline Vector8 vector8_broadcast(const uint8 c);
+
+/* element-wise comparisons to a scalar */
+static inline bool vector8_has(const Vector8 v, const uint8 c);
+static inline bool vector8_has_zero(const Vector8 v);
+static inline bool vector8_has_le(const Vector8 v, const uint8 c);
+
+
+/*
+ * Load a chunk of memory into the given vector.
+ */
+static inline void
+vector8_load(Vector8 *v, const uint8 *s)
+{
+#if defined(USE_SSE2)
+ *v = _mm_loadu_si128((const __m128i *) s);
+#else
+ memcpy(v, s, sizeof(Vector8));
#endif
+}
+
+
+/*
+ * Create a vector with all elements set to the same value.
+ */
+static inline Vector8
+vector8_broadcast(const uint8 c)
+{
+#if defined(USE_SSE2)
+ return _mm_set1_epi8(c);
+#else
+ return ~UINT64CONST(0) / 0xFF * c;
+#endif
+}
+
+/*
+ * Return true if any elements in the vector are equal to the given scalar.
+ */
+static inline bool
+vector8_has(const Vector8 v, const uint8 c)
+{
+ bool result;
+
+ /* pre-compute the result for assert checking */
+#ifdef USE_ASSERT_CHECKING
+ bool assert_result = false;
+
+ for (int i = 0; i < sizeof(Vector8); i++)
+ {
+ if (((const uint8 *) &v)[i] == c)
+ {
+ assert_result = true;
+ break;
+ }
+ }
+#endif /* USE_ASSERT_CHECKING */
+
+#if defined(USE_NO_SIMD)
+ /* any bytes in v equal to c will evaluate to zero via XOR */
+ result = vector8_has_zero(v ^ vector8_broadcast(c));
+#elif defined(USE_SSE2)
+ result = _mm_movemask_epi8(_mm_cmpeq_epi8(v, vector8_broadcast(c)));
+#endif
+
+ Assert(assert_result == result);
+ return result;
+}
+
+/*
+ * Convenience function equivalent to vector8_has(v, 0)
+ */
+static inline bool
+vector8_has_zero(const Vector8 v)
+{
+#if defined(USE_NO_SIMD)
+ /*
+ * We cannot call vector8_has() here, because that would lead to a circular
+ * definition.
+ */
+ return vector8_has_le(v, 0);
+#elif defined(USE_SSE2)
+ return vector8_has(v, 0);
+#endif
+}
+
+/*
+ * Return true if any elements in the vector are less than or equal to the
+ * given scalar.
+ */
+static inline bool
+vector8_has_le(const Vector8 v, const uint8 c)
+{
+ bool result = false;
+#if defined(USE_SSE2)
+ __m128i sub;
+#endif
+
+ /* pre-compute the result for assert checking */
+#ifdef USE_ASSERT_CHECKING
+ bool assert_result = false;
+
+ for (int i = 0; i < sizeof(Vector8); i++)
+ {
+ if (((const uint8 *) &v)[i] <= c)
+ {
+ assert_result = true;
+ break;
+ }
+ }
+#endif /* USE_ASSERT_CHECKING */
+
+#if defined(USE_NO_SIMD)
+
+ /*
+ * To find bytes <= c, we can use bitwise operations to find bytes < c+1,
+ * but it only works if c+1 <= 128 and if the highest bit in v is not set.
+ * Adapted from
+ * https://graphics.stanford.edu/~seander/bithacks.html#HasLessInWord
+ */
+ if ((int64) v >= 0 && c < 0x80)
+ result = (v - vector8_broadcast(c + 1)) & ~v & vector8_broadcast(0x80);
+ else
+ {
+ /* one byte at a time */
+ for (int i = 0; i < sizeof(Vector8); i++)
+ {
+ if (((const uint8 *) &v)[i] <= c)
+ {
+ result = true;
+ break;
+ }
+ }
+ }
+#elif defined(USE_SSE2)
+
+ /*
+ * Use saturating subtraction to find bytes <= c, which will present as
+ * NUL bytes in 'sub'.
+ */
+ sub = _mm_subs_epu8(v, vector8_broadcast(c));
+ result = vector8_has_zero(sub);
+#endif
+
+ Assert(assert_result == result);
+ return result;
+}
#endif /* SIMD_H */