comparison rrd-combine/rrd-combine.c @ 0:c7f6b056b673

First import of vendor version
author Peter Gervai <grin@grin.hu>
date Tue, 10 Mar 2009 13:49:58 +0100
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:c7f6b056b673
1 /* Distributed Checksum Clearinghouse
2 *
3 * combine RRD databases
4 *
5 * Copyright (c) 2008 by Rhyolite Software, LLC
6 *
7 * This agreement is not applicable to any entity which sells anti-spam
8 * solutions to others or provides an anti-spam solution as part of a
9 * security solution sold to other entities, or to a private network
10 * which employs the DCC or uses data provided by operation of the DCC
11 * but does not provide corresponding data to other users.
12 *
13 * Permission to use, copy, modify, and distribute this software without
14 * changes for any purpose with or without fee is hereby granted, provided
15 * that the above copyright notice and this permission notice appear in all
16 * copies and any distributed versions or copies are either unchanged
17 * or not called anything similar to "DCC" or "Distributed Checksum
18 * Clearinghouse".
19 *
20 * Parties not eligible to receive a license under this agreement can
21 * obtain a commercial license to use DCC by contacting Rhyolite Software
22 * at sales@rhyolite.com.
23 *
24 * A commercial license would be for Distributed Checksum and Reputation
25 * Clearinghouse software. That software includes additional features. This
26 * free license for Distributed ChecksumClearinghouse Software does not in any
27 * way grant permision to use Distributed Checksum and Reputation Clearinghouse
28 * software
29 *
30 * THE SOFTWARE IS PROVIDED "AS IS" AND RHYOLITE SOFTWARE, LLC DISCLAIMS ALL
31 * WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES
32 * OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL RHYOLITE SOFTWARE, LLC
33 * BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES
34 * OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
35 * WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION,
36 * ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS
37 * SOFTWARE.
38 *
39 * Rhyolite Software DCC 1.3.103-1.8 $Revision$
40 */
41
42 #include "dcc_defs.h"
43 #include <math.h>
44
45 #define PROGNAME "rrd-combine: "
46
47 /* *********************************************************************** */
48 /* copied from rrd.h and rrd_format.h */
49 typedef double rrd_value_t;
50
51 typedef union unival {
52 unsigned long u_cnt;
53 rrd_value_t u_val;
54 } unival;
55
56 typedef struct stat_head_t {
57 char cookie[4];
58 char version[5];
59 double float_cookie;
60 unsigned long ds_cnt;
61 unsigned long rra_cnt;
62 unsigned long pdp_step;
63 unival par[10];
64 } stat_head_t;
65
66 #define DS_NAM_SIZE 20
67 #define DST_SIZE 20
68 typedef struct ds_def_t {
69 char ds_nam[DS_NAM_SIZE];
70 char dst[DST_SIZE];
71 unival par[10];
72 } ds_def_t;
73
74 #define CF_NAM_SIZE 20
75 #define MAX_RRA_PAR_EN 10
76 typedef struct rra_def_t {
77 char cf_nam[CF_NAM_SIZE];
78 unsigned long row_cnt;
79 unsigned long pdp_cnt;
80 unival par[MAX_RRA_PAR_EN];
81 } rra_def_t;
82
83
84 #define LAST_DS_LEN 30
85 typedef struct pdp_prep_t{
86 char last_ds[LAST_DS_LEN];
87 unival scratch[10];
88 } pdp_prep_t;
89
90 #define MAX_CDP_PAR_EN 10
91 typedef struct cdp_prep_t{
92 unival scratch[MAX_CDP_PAR_EN];
93 } cdp_prep_t;
94
95 typedef struct rra_ptr_t {
96 unsigned long cur_row;
97 } rra_ptr_t;
98
99 typedef struct rrd_t {
100 const char *fnm;
101 int fd;
102 struct stat sb;
103 stat_head_t *stat_head; /* the static header */
104 ds_def_t *ds_def; /* list of data source definitions */
105 rra_def_t *rra_def; /* list of round robin archive def */
106 struct timeval last_update;
107 pdp_prep_t *pdp_prep; /* pdp data prep area */
108 cdp_prep_t *cdp_prep; /* cdp prep area */
109 rra_ptr_t *rra_ptr; /* list of rra pointers */
110 rrd_value_t *rrd_value; /* list of rrd values */
111 } rrd_t;
112
113 /* *********************************************************************** */
114
115
116 static DCC_EMSG dcc_emsg;
117
118 static const char *ofile;
119 static u_char force;
120 static u_char keep_mtime;
121 static u_char verbose;
122
123
124 static void NRATTRIB
125 usage(void)
126 {
127 dcc_logbad(EX_USAGE,
128 "usage: [-fkv] [-d dir] -o ofile ifile1 ifile2 ...");
129 }
130
131
132
133 static void
134 unmap_rrd(rrd_t *rrd)
135 {
136 if (rrd->stat_head) {
137 if (munmap(rrd->stat_head, rrd->sb.st_size) < 0)
138 dcc_logbad(EX_IOERR,
139 PROGNAME"munmap(%s, "OFF_DPAT"): %s",
140 rrd->fnm, rrd->sb.st_size,
141 strerror(errno));
142 }
143 }
144
145
146
147 static void
148 rrd_close(rrd_t *rrd)
149 {
150 unmap_rrd(rrd);
151
152 if (rrd->fd >= 0) {
153 if (close(rrd->fd) < 0)
154 dcc_logbad(EX_IOERR, PROGNAME"close(%s): %s",
155 rrd->fnm, strerror(errno));
156 rrd->fd = -1;
157 }
158 }
159
160
161
162 static void
163 rrd_stat(rrd_t *rrd)
164 {
165 if (0 > fstat(rrd->fd, &rrd->sb))
166 dcc_logbad(EX_IOERR, PROGNAME"stat(%s): %s",
167 rrd->fnm, strerror(errno));
168
169 if (rrd->sb.st_size <= ISZ(rrd->stat_head)) {
170 dcc_logbad(EX_DATAERR,
171 PROGNAME"%s has too small size "OFF_DPAT"",
172 rrd->fnm, rrd->sb.st_size);
173 }
174 }
175
176
177
178 static void
179 map_rrd(rrd_t *rrd, u_char in)
180 {
181 struct timeval *tv;
182 unmap_rrd(rrd);
183
184 rrd_stat(rrd);
185
186 rrd->stat_head = (stat_head_t *)mmap(0, rrd->sb.st_size,
187 in
188 ? PROT_READ
189 :(PROT_READ | PROT_WRITE),
190 MAP_SHARED,
191 rrd->fd, 0);
192 if (rrd->stat_head == (stat_head_t *)MAP_FAILED)
193 dcc_logbad(EX_IOERR, PROGNAME"mmap(%s): %s",
194 ofile, strerror(errno));
195
196 rrd->ds_def = (ds_def_t *)&rrd->stat_head[1];
197 rrd->rra_def = (rra_def_t *)&rrd->ds_def[rrd->stat_head->ds_cnt];
198 tv = (struct timeval *)&rrd->rra_def[rrd->stat_head->rra_cnt];
199 rrd->last_update.tv_sec = tv->tv_sec;
200 if (!strcmp(rrd->stat_head->version, "0001")) {
201 rrd->last_update.tv_usec = 0;
202 rrd->pdp_prep = (pdp_prep_t *)(&tv->tv_sec + 1);
203 } else {
204 rrd->last_update.tv_usec = tv->tv_usec;
205 rrd->pdp_prep = (pdp_prep_t *)(tv + 1);
206 }
207 rrd->cdp_prep = (cdp_prep_t *)&rrd->pdp_prep[rrd->stat_head->ds_cnt];
208 rrd->rra_ptr = (rra_ptr_t *)&rrd->cdp_prep[rrd->stat_head->rra_cnt
209 * rrd->stat_head->ds_cnt];
210 rrd->rrd_value =(rrd_value_t*)&rrd->rra_ptr[rrd->stat_head->rra_cnt];
211 }
212
213
214
215 static void
216 map_ifile(rrd_t *rrd, const char *new_fnm, const rrd_t *o_rrd)
217 {
218 # define PAD 128
219 const rra_def_t *i_def, *o_def;
220 const char *old_fnm;
221 off_t old_len;
222 u_long l;
223 int i;
224
225 old_fnm = rrd->fnm;
226 old_len = rrd->sb.st_size;
227
228
229 if (rrd->fd >= 0)
230 rrd_close(rrd);
231
232 rrd->fnm = new_fnm;
233 rrd->fd = open(rrd->fnm, O_RDONLY, 0);
234 if (rrd->fd < 0)
235 dcc_logbad(EX_USAGE, PROGNAME"open(%s): %s",
236 rrd->fnm, strerror(errno));
237
238 rrd_stat(rrd);
239
240 if (old_fnm && rrd->sb.st_size >= old_len + PAD)
241 dcc_logbad(EX_DATAERR,
242 PROGNAME"%s has "OFF_DPAT" bytes, more than "
243 OFF_DPAT" in %s",
244 new_fnm, rrd->sb.st_size, old_len, old_fnm);
245
246 map_rrd(rrd, 1);
247
248 if (o_rrd) {
249 if (rrd->stat_head->rra_cnt != o_rrd->stat_head->rra_cnt)
250 dcc_logbad(EX_DATAERR,
251 PROGNAME"%ld instead of %ld RRAs in %s",
252 rrd->stat_head->rra_cnt,
253 o_rrd->stat_head->rra_cnt,
254 rrd->fnm);
255 if (rrd->stat_head->ds_cnt != o_rrd->stat_head->ds_cnt)
256 dcc_logbad(EX_DATAERR,
257 PROGNAME"%ld instead of %ld DSs in %s",
258 rrd->stat_head->ds_cnt,
259 o_rrd->stat_head->ds_cnt,
260 rrd->fnm);
261 if (rrd->stat_head->pdp_step != o_rrd->stat_head->pdp_step)
262 dcc_logbad(EX_DATAERR,
263 PROGNAME"%ld instead of %ld step in %s",
264 rrd->stat_head->pdp_step,
265 o_rrd->stat_head->pdp_step,
266 rrd->fnm);
267 for (l = 0, i_def = rrd->rra_def, o_def = o_rrd->rra_def;
268 l < o_rrd->stat_head->rra_cnt;
269 ++l, ++i_def, ++o_def) {
270 if (o_def->row_cnt != i_def->row_cnt)
271 dcc_logbad(EX_DATAERR,
272 PROGNAME"%ld instead of %ld"
273 " rows in RRA #%d in %s",
274 i_def->row_cnt, o_def->row_cnt,
275 i, rrd->fnm);
276 if (o_def->pdp_cnt != i_def->pdp_cnt)
277 dcc_logbad(EX_DATAERR,
278 PROGNAME"%ld instead of %ld"
279 " data points in RRA #%d in %s",
280 i_def->pdp_cnt, o_def->pdp_cnt,
281 i, rrd->fnm);
282 }
283 }
284 }
285
286
287
288 int NRATTRIB
289 main(int argc, char **argv)
290 {
291 rrd_t o_rrd, i_rrd;
292 int fno, len;
293 rrd_value_t *i_base, *o_base;
294 u_long rrd;
295 int rows, cols, row, i_row, o_row, col;
296 rrd_value_t i_val, o_val;
297 struct timeval tv;
298 int newest;
299 char tbuf[30];
300 int i;
301
302 memset(&i_rrd, 0, sizeof(i_rrd));
303 i_rrd.fd = -1;
304 memset(&o_rrd, 0, sizeof(o_rrd));
305 o_rrd.fd = -1;
306
307 while ((i = getopt(argc, argv, "fkvd:o:")) != -1) {
308 switch (i) {
309 case 'f':
310 force = 1;
311 break;
312 case 'k':
313 keep_mtime = 1;
314 break;
315 case 'v':
316 ++verbose;
317 break;
318 case 'd':
319 if (0 > chdir(optarg))
320 dcc_logbad(EX_USAGE, "chdir(%s): %s",
321 optarg, strerror(errno));
322 break;
323 case 'o':
324 ofile = optarg;
325 break;
326 default:
327 usage();
328 }
329 }
330 argc -= optind;
331 argv += optind;
332 if (argc < 2 || !ofile)
333 usage();
334
335 /* find the newest file */
336 map_ifile(&i_rrd, argv[0], 0);
337 tv = i_rrd.last_update;
338 newest = 0;
339 if (verbose > 1)
340 dcc_trace_msg(PROGNAME"%s last updated %s.%06d",
341 argv[0],
342 dcc_time2str(tbuf, sizeof(tbuf), "%X", tv.tv_sec),
343 (int)tv.tv_usec);
344 for (fno = 1; fno < argc; ++fno) {
345 map_ifile(&i_rrd, argv[fno], 0);
346 if (tv.tv_sec > i_rrd.last_update.tv_sec
347 || (tv.tv_sec == i_rrd.last_update.tv_sec
348 && tv.tv_usec > i_rrd.last_update.tv_usec))
349 continue;
350
351 if (verbose > 1)
352 dcc_trace_msg("%40s last updated %s.%06d",
353 argv[fno],
354 dcc_time2str(tbuf, sizeof(tbuf),
355 "%X",
356 i_rrd.last_update.tv_sec),
357 (int)i_rrd.last_update.tv_usec);
358 tv = i_rrd.last_update;
359 newest = fno;
360 }
361 if (verbose > 1)
362 dcc_trace_msg(" %s is newest", argv[newest]);
363
364 /* create and mmap() the output file */
365 o_rrd.fd = open(ofile,
366 O_RDWR | O_CREAT | (force ? O_TRUNC : O_EXCL),
367 0666);
368 if (o_rrd.fd < 0)
369 dcc_logbad(EX_IOERR, PROGNAME"open(%s): %s",
370 ofile, strerror(errno));
371
372 /* copy the newest input file to the output file */
373 map_ifile(&i_rrd, argv[newest], 0);
374 len = write(o_rrd.fd, i_rrd.stat_head, i_rrd.sb.st_size);
375 if (len != i_rrd.sb.st_size)
376 dcc_logbad(EX_IOERR, PROGNAME"write(%s, "OFF_DPAT") = %d: %s",
377 o_rrd.fnm, i_rrd.sb.st_size, len, strerror(errno));
378
379 map_rrd(&o_rrd, 0);
380
381 for (fno = 0; fno < argc; ++fno) {
382 if (fno == newest)
383 continue;
384
385 map_ifile(&i_rrd, argv[fno], &o_rrd);
386
387 i_base = i_rrd.rrd_value;
388 o_base = o_rrd.rrd_value;
389 for (rrd = 0; rrd < o_rrd.stat_head->rra_cnt; ++rrd) {
390 rows = o_rrd.rra_def[rrd].row_cnt;
391 cols = i_rrd.stat_head->ds_cnt;
392
393 /* find last row in the two RRDs numbered as
394 * data consolidation moments since the UNIX epoch */
395 i_row = (i_rrd.last_update.tv_sec
396 / (i_rrd.rra_def[rrd].pdp_cnt
397 * i_rrd.stat_head->pdp_step));
398
399 o_row = (o_rrd.last_update.tv_sec
400 / (o_rrd.rra_def[rrd].pdp_cnt
401 * o_rrd.stat_head->pdp_step));
402
403 /* Find the number of rows to combine. */
404 i = o_row - i_row;
405 if (i >= 0) {
406 /* If the output RRD is newer than the input,
407 * then we will add only some of the input
408 * rows. */
409 row = rows - i;
410 } else {
411 /* we have problems if the output is older */
412 dcc_error_msg(PROGNAME
413 "%s newer than %s",
414 argv[fno], argv[1]);
415 dcc_error_msg(" i_rrd.last_update.tv_sec"
416 " / (rra_def[%lu].pdp_cnt"
417 " * stat_head->pdp_step)"
418 "\n\t= %d / (%lu * %lu) = %d",
419 rrd,
420 i_rrd.last_update.tv_sec,
421 i_rrd.rra_def[rrd].pdp_cnt,
422 i_rrd.stat_head->pdp_step,
423 i_row);
424 dcc_logbad(EX_DATAERR,
425 " o_rrd.last_update.tv_sec"
426 " / (rra_def[%lu].pdp_cnt"
427 " * stat_head->pdp_step)"
428 "\n\t= %d / (%lu * %lu) = %d",
429 rrd,
430 o_rrd.last_update.tv_sec,
431 o_rrd.rra_def[rrd].pdp_cnt,
432 o_rrd.stat_head->pdp_step,
433 o_row);
434 }
435
436 i_row = (i_rrd.rra_ptr[rrd].cur_row + 1) * cols;
437 o_row = (o_rrd.rra_ptr[rrd].cur_row + 1) * cols;
438 do {
439 /* wrap to the start at the last row */
440 if (i_row >= rows*cols)
441 i_row = 0;
442 if (o_row >= rows*cols)
443 o_row = 0;
444
445 for (col = 0;
446 col < cols;
447 ++col, ++i_row, ++o_row) {
448 i_val = i_base[i_row];
449 if (isnan(i_val))
450 continue;
451 o_val = o_base[o_row];
452 if (isnan(o_val)) {
453 o_val = i_val;
454 } else {
455 o_val += i_val;
456 }
457 o_base[o_row] = o_val;
458 }
459 } while (--row > 0);
460
461 i_base += rows * cols;
462 o_base += rows * cols;
463 }
464 }
465
466
467 unmap_rrd(&o_rrd);
468 fsync(o_rrd.fd);
469 if (!keep_mtime
470 && !dcc_set_mtime(dcc_emsg, ofile, o_rrd.fd, &o_rrd.last_update)) {
471 dcc_logbad(EX_IOERR, PROGNAME"%s", dcc_emsg);
472 exit(1);
473 }
474
475 exit(0);
476 }