aboutsummaryrefslogtreecommitdiff
path: root/contrib/tsm_system_time/tsm_system_time.c
blob: efb127c1d573374cfd110e283cc4f91a91298b9f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
/*-------------------------------------------------------------------------
 *
 * tsm_system_time.c
 *	  interface routines for system_time tablesample method
 *
 *
 * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
 *
 * IDENTIFICATION
 *	  contrib/tsm_system_time_rowlimit/tsm_system_time.c
 *
 *-------------------------------------------------------------------------
 */

#include "postgres.h"

#include "fmgr.h"

#include "access/tablesample.h"
#include "access/relscan.h"
#include "miscadmin.h"
#include "nodes/execnodes.h"
#include "nodes/relation.h"
#include "optimizer/clauses.h"
#include "storage/bufmgr.h"
#include "utils/sampling.h"
#include "utils/spccache.h"
#include "utils/timestamp.h"

PG_MODULE_MAGIC;

/*
 * State
 */
typedef struct
{
	SamplerRandomState randstate;
	uint32			seed;			/* random seed */
	BlockNumber		nblocks;		/* number of block in relation */
	int32			time;			/* time limit for sampling */
	TimestampTz		start_time;		/* start time of sampling */
	TimestampTz		end_time;		/* end time of sampling */
	OffsetNumber	lt;				/* last tuple returned from current block */
	BlockNumber		step;			/* step size */
	BlockNumber		lb;				/* last block visited */
	BlockNumber		estblocks;		/* estimated number of returned blocks (moving) */
	BlockNumber		doneblocks;		/* number of already returned blocks */
} SystemSamplerData;


PG_FUNCTION_INFO_V1(tsm_system_time_init);
PG_FUNCTION_INFO_V1(tsm_system_time_nextblock);
PG_FUNCTION_INFO_V1(tsm_system_time_nexttuple);
PG_FUNCTION_INFO_V1(tsm_system_time_end);
PG_FUNCTION_INFO_V1(tsm_system_time_reset);
PG_FUNCTION_INFO_V1(tsm_system_time_cost);

static uint32 random_relative_prime(uint32 n, SamplerRandomState randstate);

/*
 * Initializes the state.
 */
Datum
tsm_system_time_init(PG_FUNCTION_ARGS)
{
	TableSampleDesc	   *tsdesc = (TableSampleDesc *) PG_GETARG_POINTER(0);
	uint32				seed = PG_GETARG_UINT32(1);
	int32				time = PG_ARGISNULL(2) ? -1 : PG_GETARG_INT32(2);
	HeapScanDesc		scan = tsdesc->heapScan;
	SystemSamplerData  *sampler;

	if (time < 1)
		ereport(ERROR,
				(errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
				 errmsg("invalid time limit"),
				 errhint("Time limit must be positive integer value.")));

	sampler = palloc0(sizeof(SystemSamplerData));

	/* Remember initial values for reinit */
	sampler->seed = seed;
	sampler->nblocks = scan->rs_nblocks;
	sampler->lt = InvalidOffsetNumber;
	sampler->estblocks = 2;
	sampler->doneblocks = 0;
	sampler->time = time;
	sampler->start_time = GetCurrentTimestamp();
	sampler->end_time = TimestampTzPlusMilliseconds(sampler->start_time,
													sampler->time);

	sampler_random_init_state(sampler->seed, sampler->randstate);

	/* Find relative prime as step size for linear probing. */
	sampler->step = random_relative_prime(sampler->nblocks, sampler->randstate);
	/*
	 * Randomize start position so that blocks close to step size don't have
	 * higher probability of being chosen on very short scan.
	 */
	sampler->lb = sampler_random_fract(sampler->randstate) * (sampler->nblocks / sampler->step);

	tsdesc->tsmdata = (void *) sampler;

	PG_RETURN_VOID();
}

/*
 * Get next block number or InvalidBlockNumber when we're done.
 *
 * Uses linear probing algorithm for picking next block.
 */
Datum
tsm_system_time_nextblock(PG_FUNCTION_ARGS)
{
	TableSampleDesc	   *tsdesc = (TableSampleDesc *) PG_GETARG_POINTER(0);
	SystemSamplerData  *sampler = (SystemSamplerData *) tsdesc->tsmdata;

	sampler->lb = (sampler->lb + sampler->step) % sampler->nblocks;
	sampler->doneblocks++;

	/* All blocks have been read, we're done */
	if (sampler->doneblocks > sampler->nblocks)
		PG_RETURN_UINT32(InvalidBlockNumber);

	/*
	 * Update the estimations for time limit at least 10 times per estimated
	 * number of returned blocks to handle variations in block read speed.
	 */
	if (sampler->doneblocks % Max(sampler->estblocks/10, 1) == 0)
	{
		TimestampTz	now = GetCurrentTimestamp();
		long        secs;
		int         usecs;
		int			usecs_remaining;
		int			time_per_block;

		TimestampDifference(sampler->start_time, now, &secs, &usecs);
		usecs += (int) secs * 1000000;

		time_per_block = usecs / sampler->doneblocks;

		/* No time left, end. */
		TimestampDifference(now, sampler->end_time, &secs, &usecs);
		if (secs <= 0 && usecs <= 0)
			PG_RETURN_UINT32(InvalidBlockNumber);

		/* Remaining microseconds */
		usecs_remaining = usecs + (int) secs * 1000000;

		/* Recalculate estimated returned number of blocks */
		if (time_per_block < usecs_remaining && time_per_block > 0)
			sampler->estblocks = sampler->time * time_per_block;
	}

	PG_RETURN_UINT32(sampler->lb);
}

/*
 * Get next tuple offset in current block or InvalidOffsetNumber if we are done
 * with this block.
 */
Datum
tsm_system_time_nexttuple(PG_FUNCTION_ARGS)
{
	TableSampleDesc	   *tsdesc = (TableSampleDesc *) PG_GETARG_POINTER(0);
	OffsetNumber		maxoffset = PG_GETARG_UINT16(2);
	SystemSamplerData  *sampler = (SystemSamplerData *) tsdesc->tsmdata;
	OffsetNumber		tupoffset = sampler->lt;

	if (tupoffset == InvalidOffsetNumber)
		tupoffset = FirstOffsetNumber;
	else
		tupoffset++;

	if (tupoffset > maxoffset)
		tupoffset = InvalidOffsetNumber;

	sampler->lt = tupoffset;

	PG_RETURN_UINT16(tupoffset);
}

/*
 * Cleanup method.
 */
Datum
tsm_system_time_end(PG_FUNCTION_ARGS)
{
	TableSampleDesc *tsdesc = (TableSampleDesc *) PG_GETARG_POINTER(0);

	pfree(tsdesc->tsmdata);

	PG_RETURN_VOID();
}

/*
 * Reset state (called by ReScan).
 */
Datum
tsm_system_time_reset(PG_FUNCTION_ARGS)
{
	TableSampleDesc	   *tsdesc = (TableSampleDesc *) PG_GETARG_POINTER(0);
	SystemSamplerData  *sampler = (SystemSamplerData *) tsdesc->tsmdata;

	sampler->lt = InvalidOffsetNumber;
	sampler->start_time = GetCurrentTimestamp();
	sampler->end_time = TimestampTzPlusMilliseconds(sampler->start_time,
													sampler->time);
	sampler->estblocks = 2;
	sampler->doneblocks = 0;

	sampler_random_init_state(sampler->seed, sampler->randstate);
	sampler->step = random_relative_prime(sampler->nblocks, sampler->randstate);
	sampler->lb = sampler_random_fract(sampler->randstate) * (sampler->nblocks / sampler->step);

	PG_RETURN_VOID();
}

/*
 * Costing function.
 */
Datum
tsm_system_time_cost(PG_FUNCTION_ARGS)
{
	PlannerInfo	   *root = (PlannerInfo *) PG_GETARG_POINTER(0);
	Path		   *path = (Path *) PG_GETARG_POINTER(1);
	RelOptInfo	   *baserel = (RelOptInfo *) PG_GETARG_POINTER(2);
	List		   *args = (List *) PG_GETARG_POINTER(3);
	BlockNumber	   *pages = (BlockNumber *) PG_GETARG_POINTER(4);
	double		   *tuples = (double *) PG_GETARG_POINTER(5);
	Node		   *limitnode;
	int32			time;
	BlockNumber		relpages;
	double			reltuples;
	double			density;
	double			spc_random_page_cost;

	limitnode = linitial(args);
	limitnode = estimate_expression_value(root, limitnode);

	if (IsA(limitnode, RelabelType))
		limitnode = (Node *) ((RelabelType *) limitnode)->arg;

	if (IsA(limitnode, Const))
		time = DatumGetInt32(((Const *) limitnode)->constvalue);
	else
	{
		/* Default time (1s) if the estimation didn't return Const. */
		time = 1000;
	}

	relpages = baserel->pages;
	reltuples = baserel->tuples;

	/* estimate the tuple density */
	if (relpages > 0)
		density = reltuples / (double) relpages;
	else
		density = (BLCKSZ - SizeOfPageHeaderData) / baserel->width;

	/*
	 * We equal random page cost value to number of ms it takes to read the
	 * random page here which is far from accurate but we don't have anything
	 * better to base our predicted page reads.
	 */
	get_tablespace_page_costs(baserel->reltablespace,
							  &spc_random_page_cost,
							  NULL);

	/*
	 * Assumption here is that we'll never read less then 1% of table pages,
	 * this is here mainly because it is much less bad to overestimate than
	 * underestimate and using just spc_random_page_cost will probably lead
	 * to underestimations in general.
	 */
	*pages = Min(baserel->pages, Max(time/spc_random_page_cost, baserel->pages/100));
	*tuples = rint(density * (double) *pages * path->rows / baserel->tuples);
	path->rows = *tuples;

	PG_RETURN_VOID();
}

static uint32
gcd (uint32 a, uint32 b)
{
	uint32 c;

	while (a != 0)
	{
		c = a;
		a = b % a;
		b = c;
	}

	return b;
}

static uint32
random_relative_prime(uint32 n, SamplerRandomState randstate)
{
	/* Pick random starting number, with some limits on what it can be. */
	uint32 r = (uint32) sampler_random_fract(randstate) * n/2 + n/4,
		   t;

	/*
	 * This should only take 2 or 3 iterations as the probability of 2 numbers
	 * being relatively prime is ~61%.
	 */
	while ((t = gcd(r, n)) > 1)
	{
		CHECK_FOR_INTERRUPTS();
		r /= t;
	}

	return r;
}