1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
| public abstract class AbstractRichFunction implements RichFunction, AthenaCodisInterface, AthenaHbaseInterface, AthenaMediaJdbcInterface, Serializable {
private static final Logger logger = LoggerFactory.getLogger(AbstractRichFunction.class);
private static final long serialVersionUID = 1L;
private External external;
private transient CodisExternalClient<String, MediaMessage> mediaCodisClient; private transient CodisExternalClient<String, PostMessage> postCodisClient; private transient CodisExternalClient<String, Long> shunCodisClient;
private transient HbaseExternalClient<String, PostMessage> postHbaseClient;
private transient JdbcExternalClient<String, MediaPostEventMessage> weMediaJdbcClient;
@Override public void open(Configuration parameters) throws Exception { ParameterTool parameterTool = (ParameterTool)getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); String athenaConfig = parameterTool.get(ATHENA_CONFIG_NAME); this.external = JSONObject.parseObject(athenaConfig, External.class);
this.open0(parameters); }
public abstract void open0(Configuration parameters) throws Exception;
private transient RuntimeContext runtimeContext;
@Override public void setRuntimeContext(RuntimeContext t) { this.runtimeContext = t; }
@Override public RuntimeContext getRuntimeContext() { if (this.runtimeContext != null) { return this.runtimeContext; } else { throw new IllegalStateException("The runtime context has not been initialized."); } }
@Override public IterationRuntimeContext getIterationRuntimeContext() { if (this.runtimeContext == null) { throw new IllegalStateException("The runtime context has not been initialized."); } else if (this.runtimeContext instanceof IterationRuntimeContext) { return (IterationRuntimeContext) this.runtimeContext; } else { throw new IllegalStateException("This stub is not part of an iteration step function."); } }
@Override public void close() throws Exception {}
@Override public synchronized CodisExternalClient<String, MediaMessage> getMediaCodisClient() throws Exception {
if(mediaCodisClient != null) return mediaCodisClient; Map<AthenaExternalEnum, CodisExternal> map = external.getCodis(); CodisExternal codisExternal = map.get(AthenaExternalEnum.MEDIA_CACHE_EVENT); mediaCodisClient = codisExternal.getClient(); return mediaCodisClient;
}
@Override public synchronized CodisExternalClient<String, PostMessage> getPostCodisClient() throws Exception {
if(postCodisClient != null) return postCodisClient; Map<AthenaExternalEnum, CodisExternal> map = external.getCodis(); CodisExternal codisExternal = map.get(AthenaExternalEnum.POST_CACHE_EVENT); postCodisClient = codisExternal.getClient(); return postCodisClient;
}
@Override public synchronized CodisExternalClient<String, Long> getShunCodisClient() throws Exception { if(shunCodisClient != null) return shunCodisClient; Map<AthenaExternalEnum, CodisExternal> map = external.getCodis(); CodisExternal codisExternal = map.get(AthenaExternalEnum.SHUN_CACHE_EVENT); shunCodisClient = codisExternal.getClient(); return shunCodisClient;
}
@Override public synchronized JdbcExternalClient<String, MediaPostEventMessage> getWeMediaJdbcClient() throws Exception { if(weMediaJdbcClient != null) return weMediaJdbcClient; Map<AthenaExternalEnum, JdbcExternal> map = external.getJdbc(); JdbcExternal jdbcExternal = map.get(AthenaExternalEnum.WEMEDIA_MYSQL_EVENT); return jdbcExternal.getClient();
}
@Override public synchronized HbaseExternalClient<String, PostMessage> getPostHbaseClient() throws Exception { return external.getHbase().get(AthenaExternalEnum.POST_HBASE_EVENT).getClient(); }
@Override public HbaseExternalClient<String, MediaMessage> getMediaHbaseClient() throws Exception { return external.getHbase().get(AthenaExternalEnum.MEDIA_HBASE_EVENT).getClient(); } }
|