How to connect an external cassandra data store

#1

Hi,

I am trying to connect an external data store i.e. cassandra to C3. Both C3 and Cassandra are docker containers running locally.

SqlSourceSystem.create({id:'myNoSqlExternalSource', 
                                           url:'jdbc:cassandra:<hostIpv4>:7000:<keyspacename>,
                                           datastore: 'cassandra',
                                           serverEndpoint: '<hostIpv4>',
                                           port: '7000'});

SqlSourceSystem.get('myNoSqlExternalSource').ping() throws me the following error:

Datastore type cassandra specified for SqlSourceCollection myNoSqlExternalSource or inferrable from its url ‘jdbc:cassandra::7000:’ in tenant/tag CatDataLake/prod is not supported.

I am not sure if the SqlSourceSystem record is a valid entry, and feel free to suggest a valid url format for connecting a cassandra data store. Any help is appreciated. Thank You.

0 Likes

#2

Cassandra is not SQL so SqlSourceSystem isn’t going to work. In 7.8 there does not appear to be a KeyValueSqlSourceSystem. You should check c3ShowType(SourceSystem)

0 Likes

#3

And then i noticed your driver which is super surprising to me… there’s a JDBC driver for Cassandra??? (e.g. https://www.simba.com/drivers/cassandra-odbc-jdbc/) Sorry i’m out of my depth.

0 Likes

#4

You are right!, there is no such jdbc driver. Although, i tried to connect without populating the url attribute and got the same error response. Based on your first reply, I am assuming we have a different type called KeyValueSqlSourceSystem which is available from v7.9.x and the c3-docker version i’m currently using is 7.8.44.1.

0 Likes

#5

Just looking at my local version or 7.9 i do not see KeyValueSourceSystem which is what i would expect you to use to connect to cassandra. Please get in touch with your C3 account manager to understand when this feature could be delivered.

0 Likes

#6

C3 provided source systems only exist for SQL and File data today. Additionally, SQL support is only available for a subset of relational databases that have been certified by C3. Unfortunately SQL support varies across databases, for example, the same JDBC APIs used to write data for Postgres and Snowflake are not supported by Hive and Impala. For this reason, additional testing and certification is required.

That said, do not despair. A connector for Cassandra does in-fact exist. Its a “low-level” connector, providing basic connectivity and API support. In 7.8/7.9, you will need to implement fetch and create/upsert APIs.

Here is a basic example with plenty of opportunity for improvement. NOTE: password handling is omitted from this example.

CassandraHelper.c3typ - helper type to process fetch requests

type CassandraHelper<T> {
  
  /**
   * Fetches data from Cassandra and creates a FetchResult from the response
   * @param spec - C3 Fetch specification
   * @returns FetchResults
   */
  fetch: function(spec : FetchSpec) : FetchResult<T> js server
  
}

CassandraHelper.js - helper impl to connect to and query Cassandra

var log = C3.logger("CassandraHelper");

/**
 * Fetches data from Cassandra and creates a FetchResult from the response
 * @param spec - Fetch specification
 * @returns FetchResults
 */
function fetch(spec) {
  
	var typeName = c3CurrentAction().getTarget().getType();
	var tag = MetadataStore.tag(); 
	var typ = tag.readType(typeName); 
	var rows = [];
	var query = null;

	//Construct Cassandra connection spec
  	var connectionSpec = createCassandraConnectionSpec(typeName);

	if (connectionSpec != undefined) {
		
		// Get connection to Cassandra
		var cass = Cassandra.connect(connectionSpec);

		// Construct CQL query from C3 Fetch Spec
		query = constructCassandraQuery(spec, typeName);

		// Execute query
		var queryResults = cass.executeQuery(query);

		// Transform Cassandra query results to C3 FetchResults
		if (queryResults.length > 0) {
		  rows = createC3TypeFromCassandraResults(typeName, queryResults);
		}

		return {
			objs : rows,
			count: rows.length,
			hasMore : false
		}
	} else {
		 throw new Error("unable to find CassandraSourceCollection for " + typeName);
	}
}

/**
 * Converts CassandraRow rows to a C3 Type
 * @param typeName - Name of the type to instantiate
 * @param queryResults - list of records from Cassandra
 * @returns [] - array of records to be included in the FetchResult
 */
function createC3TypeFromCassandraResults(typeName, queryResults) {
	var rows = [];	
	var typ = c3Type(typeName);
	
	queryResults.forEach(function (item) {
		var o = c3Type(typeName).make();

		// keys/vals represent the individuial column values returned by Cassandea 
		var keys = item.values.keys();
		var vals = item.values.values();
		var i = vals.length;

		/**
		 * For the moment, assume that field and column names are the same.
		 * If they are different, a lookup is required.
		 */
		for (var j = 0; j< i; j++) {
			if (vals[j] != null) {
				o = o.putField(keys[j], vals[j]);
				log.info(vals[j]);
			}
		}
		
		rows.push(o);
		
	})
	
	return rows;
}

/**
 * Constructs a Cassandra query from a FetchSpec.  For the moment it supports the most basic of queries. 
 * @param fetchSpec
 * @returns string - Cassandra query
 */
function constructCassandraQuery(fetchSpec, typeName) {
	var typ = c3Type(typeName);
	var vars;
	var allowFiltering = false;
	var query = "select * from " + typ.extensions().schema.name;

	// Construct predicate
	if (fetchSpec != undefined && fetchSpec.filter != undefined) {
	  vars = fetchSpec.filter.split("==");
	  allowFiltering = true;
	  query = query + " where " + vars[0].trim() + " = " + vars[1].trim();
  	}

  	// Add limit if specified
  	if (fetchSpec != undefined && fetchSpec.limit != undefined) {
  		query = query + " limit " + fetchSpec.limit;
  	} else {
  		query = query + " limit 100";
  	}

  	// Append all filtering to enable predicate
  	if (allowFiltering == false) {
  		query = query + ";";
  	} else {
  		query = query + " ALLOW FILTERING;";
  	}
	
	return query;
}

/**
 * Constructs a Cassandra connection spec based on information in the CassandraSourceCollection
 * and CassandraSourceSystem for the type definition
 * @param typeName - the name of the type in the fetch call
 * @returns string - Cassandra connection specification
 */
function createCassandraConnectionSpec(typeName) {
	var spec, sourceSystem;
	var collection = CassandraSourceCollection.fetch({filter:"id=='" + typeName + "'", include:"id, sourceSystem", limit:1});

	if (collection.count > 0) {
		sourceSystem = CassandraSourceSystem.get(collection.objs[0].sourceSystem.id);

		var spec = CassandraConnectSpec.make({
	                  serverEndpoint: sourceSystem.serverEndpoint[0],
	                  port:sourceSystem.port,
	                  username:sourceSystem.username,
                          password: passwordVal
	               });	

		return spec;

	} else {
		return null;
	}
	
}

CassandraSourceSystem.c3typ - source system definition for Cassandra. Could also use KVSourceSystem if you’re so inclined.

entity type CassandraSourceSystem extends ExternalSourceSystem mixes Config type key 'CASS' {

	keySpace: string
	port: int
	serverEndpoint: [string] schema name 'CASS_SVRS'
	username: string

}

CassandraSourceCollection.c3typ - source collection for Cassandra. Could also use KVSourceCollection if you are so inclined.

entity type CassandraSourceCollection extends ExternalSourceCollection type key 'CASSCOL'

MyCassandraType.c3typ - type that models the Cassandra table.

type MyCassandraType mixes CassandraHelper<MyCassandraType>, External, NoSystemCols, Config schema name 'keyspace.table_name' {

    field_one: string
    field_two: string
    field_three: string
    field_four: string

}

MyCassandraType.fetch() can now be used to fetch data from an external Cassandra DB.

0 Likes

How to define a connection to an external DynamoDB datastore?