Mercurial > notdcc
diff rrd-combine/rrd-combine.c @ 0:c7f6b056b673
First import of vendor version
author | Peter Gervai <grin@grin.hu> |
---|---|
date | Tue, 10 Mar 2009 13:49:58 +0100 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/rrd-combine/rrd-combine.c Tue Mar 10 13:49:58 2009 +0100 @@ -0,0 +1,476 @@ +/* Distributed Checksum Clearinghouse + * + * combine RRD databases + * + * Copyright (c) 2008 by Rhyolite Software, LLC + * + * This agreement is not applicable to any entity which sells anti-spam + * solutions to others or provides an anti-spam solution as part of a + * security solution sold to other entities, or to a private network + * which employs the DCC or uses data provided by operation of the DCC + * but does not provide corresponding data to other users. + * + * Permission to use, copy, modify, and distribute this software without + * changes for any purpose with or without fee is hereby granted, provided + * that the above copyright notice and this permission notice appear in all + * copies and any distributed versions or copies are either unchanged + * or not called anything similar to "DCC" or "Distributed Checksum + * Clearinghouse". + * + * Parties not eligible to receive a license under this agreement can + * obtain a commercial license to use DCC by contacting Rhyolite Software + * at sales@rhyolite.com. + * + * A commercial license would be for Distributed Checksum and Reputation + * Clearinghouse software. That software includes additional features. This + * free license for Distributed ChecksumClearinghouse Software does not in any + * way grant permision to use Distributed Checksum and Reputation Clearinghouse + * software + * + * THE SOFTWARE IS PROVIDED "AS IS" AND RHYOLITE SOFTWARE, LLC DISCLAIMS ALL + * WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES + * OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL RHYOLITE SOFTWARE, LLC + * BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES + * OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, + * WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, + * ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS + * SOFTWARE. + * + * Rhyolite Software DCC 1.3.103-1.8 $Revision$ + */ + +#include "dcc_defs.h" +#include <math.h> + +#define PROGNAME "rrd-combine: " + +/* *********************************************************************** */ +/* copied from rrd.h and rrd_format.h */ +typedef double rrd_value_t; + +typedef union unival { + unsigned long u_cnt; + rrd_value_t u_val; +} unival; + +typedef struct stat_head_t { + char cookie[4]; + char version[5]; + double float_cookie; + unsigned long ds_cnt; + unsigned long rra_cnt; + unsigned long pdp_step; + unival par[10]; +} stat_head_t; + +#define DS_NAM_SIZE 20 +#define DST_SIZE 20 +typedef struct ds_def_t { + char ds_nam[DS_NAM_SIZE]; + char dst[DST_SIZE]; + unival par[10]; +} ds_def_t; + +#define CF_NAM_SIZE 20 +#define MAX_RRA_PAR_EN 10 +typedef struct rra_def_t { + char cf_nam[CF_NAM_SIZE]; + unsigned long row_cnt; + unsigned long pdp_cnt; + unival par[MAX_RRA_PAR_EN]; +} rra_def_t; + + +#define LAST_DS_LEN 30 +typedef struct pdp_prep_t{ + char last_ds[LAST_DS_LEN]; + unival scratch[10]; +} pdp_prep_t; + +#define MAX_CDP_PAR_EN 10 +typedef struct cdp_prep_t{ + unival scratch[MAX_CDP_PAR_EN]; +} cdp_prep_t; + +typedef struct rra_ptr_t { + unsigned long cur_row; +} rra_ptr_t; + +typedef struct rrd_t { + const char *fnm; + int fd; + struct stat sb; + stat_head_t *stat_head; /* the static header */ + ds_def_t *ds_def; /* list of data source definitions */ + rra_def_t *rra_def; /* list of round robin archive def */ + struct timeval last_update; + pdp_prep_t *pdp_prep; /* pdp data prep area */ + cdp_prep_t *cdp_prep; /* cdp prep area */ + rra_ptr_t *rra_ptr; /* list of rra pointers */ + rrd_value_t *rrd_value; /* list of rrd values */ +} rrd_t; + +/* *********************************************************************** */ + + +static DCC_EMSG dcc_emsg; + +static const char *ofile; +static u_char force; +static u_char keep_mtime; +static u_char verbose; + + +static void NRATTRIB +usage(void) +{ + dcc_logbad(EX_USAGE, + "usage: [-fkv] [-d dir] -o ofile ifile1 ifile2 ..."); +} + + + +static void +unmap_rrd(rrd_t *rrd) +{ + if (rrd->stat_head) { + if (munmap(rrd->stat_head, rrd->sb.st_size) < 0) + dcc_logbad(EX_IOERR, + PROGNAME"munmap(%s, "OFF_DPAT"): %s", + rrd->fnm, rrd->sb.st_size, + strerror(errno)); + } +} + + + +static void +rrd_close(rrd_t *rrd) +{ + unmap_rrd(rrd); + + if (rrd->fd >= 0) { + if (close(rrd->fd) < 0) + dcc_logbad(EX_IOERR, PROGNAME"close(%s): %s", + rrd->fnm, strerror(errno)); + rrd->fd = -1; + } +} + + + +static void +rrd_stat(rrd_t *rrd) +{ + if (0 > fstat(rrd->fd, &rrd->sb)) + dcc_logbad(EX_IOERR, PROGNAME"stat(%s): %s", + rrd->fnm, strerror(errno)); + + if (rrd->sb.st_size <= ISZ(rrd->stat_head)) { + dcc_logbad(EX_DATAERR, + PROGNAME"%s has too small size "OFF_DPAT"", + rrd->fnm, rrd->sb.st_size); + } +} + + + +static void +map_rrd(rrd_t *rrd, u_char in) +{ + struct timeval *tv; + unmap_rrd(rrd); + + rrd_stat(rrd); + + rrd->stat_head = (stat_head_t *)mmap(0, rrd->sb.st_size, + in + ? PROT_READ + :(PROT_READ | PROT_WRITE), + MAP_SHARED, + rrd->fd, 0); + if (rrd->stat_head == (stat_head_t *)MAP_FAILED) + dcc_logbad(EX_IOERR, PROGNAME"mmap(%s): %s", + ofile, strerror(errno)); + + rrd->ds_def = (ds_def_t *)&rrd->stat_head[1]; + rrd->rra_def = (rra_def_t *)&rrd->ds_def[rrd->stat_head->ds_cnt]; + tv = (struct timeval *)&rrd->rra_def[rrd->stat_head->rra_cnt]; + rrd->last_update.tv_sec = tv->tv_sec; + if (!strcmp(rrd->stat_head->version, "0001")) { + rrd->last_update.tv_usec = 0; + rrd->pdp_prep = (pdp_prep_t *)(&tv->tv_sec + 1); + } else { + rrd->last_update.tv_usec = tv->tv_usec; + rrd->pdp_prep = (pdp_prep_t *)(tv + 1); + } + rrd->cdp_prep = (cdp_prep_t *)&rrd->pdp_prep[rrd->stat_head->ds_cnt]; + rrd->rra_ptr = (rra_ptr_t *)&rrd->cdp_prep[rrd->stat_head->rra_cnt + * rrd->stat_head->ds_cnt]; + rrd->rrd_value =(rrd_value_t*)&rrd->rra_ptr[rrd->stat_head->rra_cnt]; +} + + + +static void +map_ifile(rrd_t *rrd, const char *new_fnm, const rrd_t *o_rrd) +{ +# define PAD 128 + const rra_def_t *i_def, *o_def; + const char *old_fnm; + off_t old_len; + u_long l; + int i; + + old_fnm = rrd->fnm; + old_len = rrd->sb.st_size; + + + if (rrd->fd >= 0) + rrd_close(rrd); + + rrd->fnm = new_fnm; + rrd->fd = open(rrd->fnm, O_RDONLY, 0); + if (rrd->fd < 0) + dcc_logbad(EX_USAGE, PROGNAME"open(%s): %s", + rrd->fnm, strerror(errno)); + + rrd_stat(rrd); + + if (old_fnm && rrd->sb.st_size >= old_len + PAD) + dcc_logbad(EX_DATAERR, + PROGNAME"%s has "OFF_DPAT" bytes, more than " + OFF_DPAT" in %s", + new_fnm, rrd->sb.st_size, old_len, old_fnm); + + map_rrd(rrd, 1); + + if (o_rrd) { + if (rrd->stat_head->rra_cnt != o_rrd->stat_head->rra_cnt) + dcc_logbad(EX_DATAERR, + PROGNAME"%ld instead of %ld RRAs in %s", + rrd->stat_head->rra_cnt, + o_rrd->stat_head->rra_cnt, + rrd->fnm); + if (rrd->stat_head->ds_cnt != o_rrd->stat_head->ds_cnt) + dcc_logbad(EX_DATAERR, + PROGNAME"%ld instead of %ld DSs in %s", + rrd->stat_head->ds_cnt, + o_rrd->stat_head->ds_cnt, + rrd->fnm); + if (rrd->stat_head->pdp_step != o_rrd->stat_head->pdp_step) + dcc_logbad(EX_DATAERR, + PROGNAME"%ld instead of %ld step in %s", + rrd->stat_head->pdp_step, + o_rrd->stat_head->pdp_step, + rrd->fnm); + for (l = 0, i_def = rrd->rra_def, o_def = o_rrd->rra_def; + l < o_rrd->stat_head->rra_cnt; + ++l, ++i_def, ++o_def) { + if (o_def->row_cnt != i_def->row_cnt) + dcc_logbad(EX_DATAERR, + PROGNAME"%ld instead of %ld" + " rows in RRA #%d in %s", + i_def->row_cnt, o_def->row_cnt, + i, rrd->fnm); + if (o_def->pdp_cnt != i_def->pdp_cnt) + dcc_logbad(EX_DATAERR, + PROGNAME"%ld instead of %ld" + " data points in RRA #%d in %s", + i_def->pdp_cnt, o_def->pdp_cnt, + i, rrd->fnm); + } + } +} + + + +int NRATTRIB +main(int argc, char **argv) +{ + rrd_t o_rrd, i_rrd; + int fno, len; + rrd_value_t *i_base, *o_base; + u_long rrd; + int rows, cols, row, i_row, o_row, col; + rrd_value_t i_val, o_val; + struct timeval tv; + int newest; + char tbuf[30]; + int i; + + memset(&i_rrd, 0, sizeof(i_rrd)); + i_rrd.fd = -1; + memset(&o_rrd, 0, sizeof(o_rrd)); + o_rrd.fd = -1; + + while ((i = getopt(argc, argv, "fkvd:o:")) != -1) { + switch (i) { + case 'f': + force = 1; + break; + case 'k': + keep_mtime = 1; + break; + case 'v': + ++verbose; + break; + case 'd': + if (0 > chdir(optarg)) + dcc_logbad(EX_USAGE, "chdir(%s): %s", + optarg, strerror(errno)); + break; + case 'o': + ofile = optarg; + break; + default: + usage(); + } + } + argc -= optind; + argv += optind; + if (argc < 2 || !ofile) + usage(); + + /* find the newest file */ + map_ifile(&i_rrd, argv[0], 0); + tv = i_rrd.last_update; + newest = 0; + if (verbose > 1) + dcc_trace_msg(PROGNAME"%s last updated %s.%06d", + argv[0], + dcc_time2str(tbuf, sizeof(tbuf), "%X", tv.tv_sec), + (int)tv.tv_usec); + for (fno = 1; fno < argc; ++fno) { + map_ifile(&i_rrd, argv[fno], 0); + if (tv.tv_sec > i_rrd.last_update.tv_sec + || (tv.tv_sec == i_rrd.last_update.tv_sec + && tv.tv_usec > i_rrd.last_update.tv_usec)) + continue; + + if (verbose > 1) + dcc_trace_msg("%40s last updated %s.%06d", + argv[fno], + dcc_time2str(tbuf, sizeof(tbuf), + "%X", + i_rrd.last_update.tv_sec), + (int)i_rrd.last_update.tv_usec); + tv = i_rrd.last_update; + newest = fno; + } + if (verbose > 1) + dcc_trace_msg(" %s is newest", argv[newest]); + + /* create and mmap() the output file */ + o_rrd.fd = open(ofile, + O_RDWR | O_CREAT | (force ? O_TRUNC : O_EXCL), + 0666); + if (o_rrd.fd < 0) + dcc_logbad(EX_IOERR, PROGNAME"open(%s): %s", + ofile, strerror(errno)); + + /* copy the newest input file to the output file */ + map_ifile(&i_rrd, argv[newest], 0); + len = write(o_rrd.fd, i_rrd.stat_head, i_rrd.sb.st_size); + if (len != i_rrd.sb.st_size) + dcc_logbad(EX_IOERR, PROGNAME"write(%s, "OFF_DPAT") = %d: %s", + o_rrd.fnm, i_rrd.sb.st_size, len, strerror(errno)); + + map_rrd(&o_rrd, 0); + + for (fno = 0; fno < argc; ++fno) { + if (fno == newest) + continue; + + map_ifile(&i_rrd, argv[fno], &o_rrd); + + i_base = i_rrd.rrd_value; + o_base = o_rrd.rrd_value; + for (rrd = 0; rrd < o_rrd.stat_head->rra_cnt; ++rrd) { + rows = o_rrd.rra_def[rrd].row_cnt; + cols = i_rrd.stat_head->ds_cnt; + + /* find last row in the two RRDs numbered as + * data consolidation moments since the UNIX epoch */ + i_row = (i_rrd.last_update.tv_sec + / (i_rrd.rra_def[rrd].pdp_cnt + * i_rrd.stat_head->pdp_step)); + + o_row = (o_rrd.last_update.tv_sec + / (o_rrd.rra_def[rrd].pdp_cnt + * o_rrd.stat_head->pdp_step)); + + /* Find the number of rows to combine. */ + i = o_row - i_row; + if (i >= 0) { + /* If the output RRD is newer than the input, + * then we will add only some of the input + * rows. */ + row = rows - i; + } else { + /* we have problems if the output is older */ + dcc_error_msg(PROGNAME + "%s newer than %s", + argv[fno], argv[1]); + dcc_error_msg(" i_rrd.last_update.tv_sec" + " / (rra_def[%lu].pdp_cnt" + " * stat_head->pdp_step)" + "\n\t= %d / (%lu * %lu) = %d", + rrd, + i_rrd.last_update.tv_sec, + i_rrd.rra_def[rrd].pdp_cnt, + i_rrd.stat_head->pdp_step, + i_row); + dcc_logbad(EX_DATAERR, + " o_rrd.last_update.tv_sec" + " / (rra_def[%lu].pdp_cnt" + " * stat_head->pdp_step)" + "\n\t= %d / (%lu * %lu) = %d", + rrd, + o_rrd.last_update.tv_sec, + o_rrd.rra_def[rrd].pdp_cnt, + o_rrd.stat_head->pdp_step, + o_row); + } + + i_row = (i_rrd.rra_ptr[rrd].cur_row + 1) * cols; + o_row = (o_rrd.rra_ptr[rrd].cur_row + 1) * cols; + do { + /* wrap to the start at the last row */ + if (i_row >= rows*cols) + i_row = 0; + if (o_row >= rows*cols) + o_row = 0; + + for (col = 0; + col < cols; + ++col, ++i_row, ++o_row) { + i_val = i_base[i_row]; + if (isnan(i_val)) + continue; + o_val = o_base[o_row]; + if (isnan(o_val)) { + o_val = i_val; + } else { + o_val += i_val; + } + o_base[o_row] = o_val; + } + } while (--row > 0); + + i_base += rows * cols; + o_base += rows * cols; + } + } + + + unmap_rrd(&o_rrd); + fsync(o_rrd.fd); + if (!keep_mtime + && !dcc_set_mtime(dcc_emsg, ofile, o_rrd.fd, &o_rrd.last_update)) { + dcc_logbad(EX_IOERR, PROGNAME"%s", dcc_emsg); + exit(1); + } + + exit(0); +}