junit - Testing Storm Bolts and Spouts -
this general question regarding unit testing bolts , spouts in storm topology written in java.
what recommended practice , guideline unit-testing (junit?) bolts , spouts?
for instance, write junit test bolt
, without understanding framework (like lifecycle of bolt
) , serialization implications, make mistake of constructor-based creation of non-serializable member variables. in junit, test pass, in topology, wouldn't work. imagine there many test points 1 needs consider (such example serialization & lifecycle).
therefore, recommended if use junit based unit tests, run small mock topology (localmode
?) , test implied contract bolt
(or spout
) under topology? or, ok use junit, implication being have simulate lifecycle of bolt (creating it, calling prepare()
, mocking config
, etc) carefully? in case, general test points class under test (bolt/spout) consider?
what have other developers done, respect creating proper unit tests?
i noticed there topology testing api (see: https://github.com/xumingming/storm-lib/blob/master/src/jvm/storm/testingapidemo.java). better use of api, , stand "test topologies" each individual bolt
& spout
(and verifying implicit contract bolt has provide for, eg - it's declared outputs)?
thanks
our approach use constructor-injection of serializable factory spout/bolt. spout/bolt consults factory in open/prepare method. factory's single responsibility encapsulate obtaining spout/bolt's dependencies in serializable fashion. design allows our unit tests inject fake/test/mock factories which, when consulted, return mock services. in way can narrowly unit test spout/bolts using mocks e.g. mockito.
below generic example of bolt , test it. have omitted implementation of factory usernotificationfactory
because depends on application. might use service locators obtain services, serialized configuration, hdfs-accessible configuration, or way @ correct services, long factory can after serde cycle. should cover serialization of class.
bolt
public class notifyuserbolt extends basebasicbolt { public static final string name = "notifyuser"; private static final string user_id_field_name = "userid"; private final usernotifierfactory factory; transient private usernotifier notifier; public notifyuserbolt(usernotifierfactory factory) { checknotnull(factory); this.factory = factory; } @override public void prepare(map stormconf, topologycontext context) { notifier = factory.createusernotifier(); } @override public void execute(tuple input, basicoutputcollector collector) { // check ensures time-dependency imposed storm has been observed checkstate(notifier != null, "unable execute because user notifier unavailable. bolt prepared?"); long userid = input.getlongbyfield(previousbolt.user_id_field_name); notifier.notifyuser(userid); collector.emit(new values(userid)); } @override public void declareoutputfields(outputfieldsdeclarer declarer) { declarer.declare(new fields(user_id_field_name)); } }
test
public class notifyuserbolttest { private notifyuserbolt bolt; @mock private topologycontext topologycontext; @mock private usernotifier notifier; // test implementation allows mock unit-under-test. private class testfactory implements usernotifierfactory { private final usernotifier notifier; private testfactory(usernotifier notifier) { this.notifier = notifier; } @override public usernotifier createusernotifier() { return notifier; } } @before public void before() { mockitoannotations.initmocks(this); // factory return our mock `notifier` bolt = new notifyuserbolt(new testfactory(notifier)); // bolt holding on our mock , under our control! bolt.prepare(new config(), topologycontext); } @test public void testexecute() { long userid = 24; tuple tuple = mock(tuple.class); when(tuple.getlongbyfield(previousbolt.user_id_field_name)).thenreturn(userid); basicoutputcollector collector = mock(basicoutputcollector.class); bolt.execute(tuple, collector); // here verify call on `notifier`, have stubbed out behavior befor // call execute, too. verify(notifier).notifyuser(userid); verify(collector).emit(new values(userid)); } }
Comments
Post a Comment