Application Monitoring


#1

Setup monitoring (potentially through a CronJob) for critical jobs/queues and get notified by email when things go wrong. Example of usage:

var jobTypeIDList = 'MyMRJob:job-id,MyBatchJob:batch-id';
var canonicalList = 'Canonical1,Canonical2';
ApplicationMonitoring.startMonitor(jobTypeIDList, canonicalList)

Following are the implementation details:
1- Type definition ApplicationMonitoring.c3typ

type ApplicationMonitoring {
    /**
     * Start monitoring and looking for issues.
     * @return true if an alert is raised, false otherwise.
     */
    startMonitor: function(jobs: [string], canonicals: [string]): boolean js server
}

2- monitoring implementation ApplicationMonitoring.js

function startMonitor(jobs, canonicals) {
    var content = '';

    // Check MR job sequence
    content += '\nJob issues: \n' + reportJobsIssues(jobs);

    // Check Data Loading status
    content += '\nIntegration issues: \n' + reportIntegrationIssues();

    // Check Files (must be 12 per day)
    content += '\nData issues: \n' + reportDataIssues(canonicals);

    // Check Cluster
    content += '\nCluster issues: \n' + reportClusterIssues();

    // Check Queues
    content += '\nQueues issues: \n' + reportQueuesIssues();

    if (content !== '') {
        // have a TenantConfig that stores a comma separated list of email receipts
        var receipts = TenantConfig.getValue('monitoring-receipts').split(',');
        var mail = Mail.make({
            from         : 'root@c3-e.com',
            to           : receipts,
            content_type : 'text/html',
            subject      : '[' + c3Context().tenant + '/' + c3Context().tag + '] Application status report',
            content      : content + '\n\nThis is an automated email. Please do not respond to this email.\n\n'
        });
        Mail.send(mail);
        return true;
    }
    return false;
}

List of helper functions

function reportDataIssues(canonicals) {
    var content = '';
    canonicals.map(function(canonical) {
        var missingDaysList = getMissingDays(canonical);
        if (!_.isEmpty(missingDays)) {
            content += 'Some of the ' + canonical + ' files are missing on these days: ' + missingDaysList;
        }
    });
    return content;
}
function reportIntegrationIssues() {
    var content = '';
    DataLoadUploadLog.evaluate({
        filter: Filter.gt('meta.timestamp', DateTime.now().plusDays(-7)).and.lt('meta.created', DateTime.now().plusMinutes(-30)),
        projection:'uploadStatus, processLogStatus, status, count(), max(meta.created)',
        group:'uploadStatus, processLogStatus, status'
    }).tuples.each(function(file) {
        if (!(file.cells[0].str === file.cells[1].str && file.cells[1].str === file.cells[2].str && file.cells[2].str === 'SUCCESS')) content += 'There are files in DataLoadUploadLog not in SUCCESS status, please check \n\n';
        if ((file.cells[0].str === file.cells[1].str && file.cells[1].str === file.cells[2].str && file.cells[2].str === 'SUCCESS') && (DateTime.now() - file.cells[4].date > 2 * 3600000 + (3600000 / 2))) content += 'We are not loading data since more than 2 hours, please check \n\n';
    });
    return content;
}

function reportClusterIssues() {
    var content = '';
    Cluster.hosts().each(function(host) {
        if (host.state !== 'RUNNING') content += 'Node with id: ' + host.host + ' is not running, please ping OPS \n\n';
    });
    Cassandra.status().each(function(host) {
        if (host.status !== 'UP' || host.state !== 'NORMAL') content += 'Cassandra node with id: ' + host.nodeId + ' is not in a good shape, please ping OPS \n\n';
    });
    return content;
}

function reportQueuesIssues() {
    var content = '';
    var minFailedPerQueue = {};
    minFailedPerQueue.MapReduceQueue = 0;
    minFailedPerQueue.BatchQueue = 0;
    minFailedPerQueue.CalcFieldsQueue = 20;
    minFailedPerQueue.ActionQueue = 2;
    minFailedPerQueue.AnalyticsQueue = 1000;

    InvalidationQueue.countAll().map(function(queue) {
        if (queue.paused) {
            content += queue.queue + ' is paused, please check \n\n';
        }
        if (minFailedPerQueue[queue.queue] < queue.failed) {
            content += queue.queue + ' has ' + queue.failed + ' items failed, please check \n\n';
        }
    });
    return content;
}

function reportJobsIssues(jobs) {
    var content = '';
    jobs.map(function(job) {
        var job = job.split(':');
        var jobType = job[0];
        var jobID = job[1];
        if (!isJobCompleted(jobType, jobID)) {
            content += jobType + ' ' + jobID + ' is not completed, please check \n\n';
        }
    });
    return content;
}

function isJobCompleted(jobType, jobID) {
    return c3Type(jobType).get(jobID)
        .status().status === 'completed';
}

function getMissingDays(canonical) {
    var missingDays = [];
    var files = DataLoadUploadLog.fetch({
        limit:12 * 7, order:'descending(meta.created)',
        filter: Filter.eq('targetTypeName', canonical)
    });
    var grouped = _.countBy(files.objs, function(f) {
        return f.source.split('_')[4].substring(0,8);
    });
    delete grouped[Object.keys(grouped)[0]];
    for (var day in grouped) {
        if (grouped[day] < 12) missingDays.push(day);
    }
    return missingDays.join();
}

Credit: @marcosordi


Alerting for "no data"