最近在測(cè)試HCatalog,由于Hcatalog本身就是一個(gè)獨(dú)立JAR包,雖然它也可以運(yùn)行service,但是其實(shí)這個(gè)service就是metastore thrift server,我們?cè)趯懟贖catalog的mapreduce job時(shí)候只要把hcatalog JAR包和對(duì)應(yīng)的hive-site.xml文件加入libjars和HADOOP_CLASSPATH中就可以了。 不過(guò)在測(cè)試的時(shí)候還是遇到了一些問(wèn)題,hive metastore server在運(yùn)行了一段時(shí)間后會(huì)拋如下錯(cuò)誤
?
2013-06-19 10:35:51,718 ERROR server.TThreadPoolServer (TThreadPoolServer.java:run(182)) - Error occurred during processing of message. javax.jdo.JDOFatalUserException: Persistence Manager has been closed at org.datanucleus.jdo.JDOPersistenceManager.assertIsOpen(JDOPersistenceManager.java:2124) at org.datanucleus.jdo.JDOPersistenceManager.currentTransaction(JDOPersistenceManager.java:315) at org.apache.hadoop.hive.metastore.ObjectStore.openTransaction(ObjectStore.java:294) at org.apache.hadoop.hive.metastore.ObjectStore.getTable(ObjectStore.java:732) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.apache.hadoop.hive.metastore.RetryingRawStore.invoke(RetryingRawStore.java:111) at com.sun.proxy.$Proxy5.getTable(Unknown Source) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.get_table(HiveMetaStore.java:982) at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Processor$get_table.getResult(ThriftHiveMetastore.java:5017) at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Processor$get_table.getResult(ThriftHiveMetastore.java:5005) at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:32) at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:34)
?
?
其中PersistenceManager負(fù)責(zé)控制一組持久化對(duì)象包括創(chuàng)建持久化對(duì)象和查詢對(duì)象,它是ObjectStore的一個(gè)實(shí)例變量,每個(gè)ObjectStore擁有一個(gè)pm,RawStore是metastore邏輯層和物理底層元數(shù)據(jù)庫(kù)(比如derby)交互的接口類,ObjectStore是RawStore的默認(rèn)實(shí)現(xiàn)類。Hive Metastore Server啟動(dòng)的時(shí)候會(huì)指定一個(gè)TProcessor,包裝了一個(gè)HMSHandler,內(nèi)部有一個(gè)ThreadLocal<RawStore> threadLocalMS實(shí)例變量,每個(gè)thread維護(hù)一個(gè)RawStore
?
private final ThreadLocal<RawStore> threadLocalMS = new ThreadLocal<RawStore>() { @Override protected synchronized RawStore initialValue() { return null; } };
每一個(gè)從hive metastore client過(guò)來(lái)的請(qǐng)求都會(huì)從線程池中分配一個(gè)
WorkerProcess來(lái)處理,在HMSHandler中每一個(gè)方法都會(huì)通過(guò)getMS()獲取rawstore instance來(lái)做具體操作
?
?
public RawStore getMS() throws MetaException { RawStore ms = threadLocalMS.get(); if (ms == null) { ms = newRawStore(); threadLocalMS.set(ms); ms = threadLocalMS.get(); } return ms; }
看得出來(lái)RawStore是延遲加載,初始化后綁定到threadlocal變量中可以為以后復(fù)用
?
?
private RawStore newRawStore() throws MetaException { LOG.info(addPrefix("Opening raw store with implemenation class:" + rawStoreClassName)); Configuration conf = getConf(); return RetryingRawStore.getProxy(hiveConf, conf, rawStoreClassName, threadLocalId.get()); }
RawStore使用了動(dòng)態(tài)代理模式(繼承
InvocationHandler接口
),內(nèi)部實(shí)現(xiàn)了invoke函數(shù),通過(guò)method.invoke()執(zhí)行真正的邏輯,這樣的好處是可以在
method.invoke()上下文中添加自己其他的邏輯,RetryingRawStore就是在通過(guò)捕捉invoke函數(shù)拋出的異常,來(lái)達(dá)到重試的效果。由于使用reflection機(jī)制,異常是wrap在
InvocationTargetException中的,
不過(guò)在hive 0.9中竟然在捕捉到
此異常后直接throw出來(lái)了,而不是retry,明顯不對(duì)啊。我對(duì)它修改了下,拿出wrap的target exception,判斷是不是instance of jdoexception的,再做相應(yīng)的處理
?
?
@Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Object ret = null; boolean gotNewConnectUrl = false; boolean reloadConf = HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.METASTOREFORCERELOADCONF); boolean reloadConfOnJdoException = false; if (reloadConf) { updateConnectionURL(getConf(), null); } int retryCount = 0; Exception caughtException = null; while (true) { try { if (reloadConf || gotNewConnectUrl || reloadConfOnJdoException) { initMS(); } ret = method.invoke(base, args); break; } catch (javax.jdo.JDOException e) { caughtException = (javax.jdo.JDOException) e.getCause(); } catch (UndeclaredThrowableException e) { throw e.getCause(); } catch (InvocationTargetException e) { Throwable t = e.getTargetException(); if (t instanceof JDOException){ caughtException = (JDOException) e.getTargetException(); reloadConfOnJdoException = true; LOG.error("rawstore jdoexception:" + caughtException.toString()); }else { throw e.getCause(); } } if (retryCount >= retryLimit) { throw caughtException; } assert (retryInterval >= 0); retryCount++; LOG.error( String.format( "JDO datastore error. Retrying metastore command " + "after %d ms (attempt %d of %d)", retryInterval, retryCount, retryLimit)); Thread.sleep(retryInterval); // If we have a connection error, the JDO connection URL hook might // provide us with a new URL to access the datastore. String lastUrl = getConnectionURL(getConf()); gotNewConnectUrl = updateConnectionURL(getConf(), lastUrl); } return ret; }
初始化RawStore有兩種方式,一種是在
RetryingRawStore的構(gòu)造函數(shù)中調(diào)用"
this.base = (RawStore) ReflectionUtils.newInstance(rawStoreClass, conf);
" ?因?yàn)镺bjectStore實(shí)現(xiàn)了Configurable,在newInstance方法中主動(dòng)調(diào)用里面的setConf(conf)方法初始化RawStore,還有一種情況是在捕捉到異常后retry,也會(huì)調(diào)用
base.setConf(getConf());
?
?
private void initMS() { base.setConf(getConf()); }
?
ObjectStore的setConf方法中,先將PersistenceManagerFactory鎖住,pm close掉,設(shè)置成NULL,再初始化pm
?
public void setConf(Configuration conf) { // Although an instance of ObjectStore is accessed by one thread, there may // be many threads with ObjectStore instances. So the static variables // pmf and prop need to be protected with locks. pmfPropLock.lock(); try { isInitialized = false; hiveConf = conf; Properties propsFromConf = getDataSourceProps(conf); boolean propsChanged = !propsFromConf.equals(prop); if (propsChanged) { pmf = null; prop = null; } assert(!isActiveTransaction()); shutdown(); // Always want to re-create pm as we don't know if it were created by the // most recent instance of the pmf pm = null; openTrasactionCalls = 0; currentTransaction = null; transactionStatus = TXN_STATUS.NO_STATE; initialize(propsFromConf); if (!isInitialized) { throw new RuntimeException( "Unable to create persistence manager. Check dss.log for details"); } else { LOG.info("Initialized ObjectStore"); } } finally { pmfPropLock.unlock(); } }
?
private void initialize(Properties dsProps) { LOG.info("ObjectStore, initialize called"); prop = dsProps; pm = getPersistenceManager(); isInitialized = pm != null; return; }
回到一開始報(bào)錯(cuò)的那段信息,怎么會(huì)Persistence Manager會(huì)被關(guān)閉呢,仔細(xì)排查后才發(fā)現(xiàn)是由于HCatalog使用HiveMetastoreClient用完后主動(dòng)調(diào)用了close方法,而一般Hive里面內(nèi)部不會(huì)調(diào)這個(gè)方法.
?
HiveMetaStoreClient.java
?
public void close() { isConnected = false; try { if (null != client) { client.shutdown(); } } catch (TException e) { LOG.error("Unable to shutdown local metastore client", e); } // Transport would have got closed via client.shutdown(), so we dont need this, but // just in case, we make this call. if ((transport != null) && transport.isOpen()) { transport.close(); } }
對(duì)應(yīng)server端HMSHandler中的shutdown方法
@Override public void shutdown() { logInfo("Shutting down the object store..."); RawStore ms = threadLocalMS.get(); if (ms != null) { ms.shutdown(); ms = null; } logInfo("Metastore shutdown complete."); }
ObjectStore的shutdown方法
?
?
public void shutdown() { if (pm != null) { pm.close(); } }
?
?
?
我們看到shutdown方法里面只是把當(dāng)前thread的ObjectStore拿出來(lái)后,做了一個(gè)ObjectStore shutdown方法,把pm關(guān)閉了。但是并沒(méi)有把ObjectStore銷毀掉,它還是存在于threadLocalMS中,下次還是會(huì)被拿出來(lái),下一次這個(gè)thread服務(wù)于另外一個(gè)請(qǐng)求的時(shí)候又會(huì)被get出ObjectSture來(lái),但是由于里面的pm已經(jīng)close掉了所以肯定拋異常。正確的做法是應(yīng)該加上threadLocalMS.remove()或者threadLocalMS.set(null),主動(dòng)將其從ThreadLocalMap中刪除。
修改后的 shutdown方法
?
public void shutdown() { logInfo("Shutting down the object store..."); RawStore ms = threadLocalMS.get(); if (ms != null) { ms.shutdown(); ms = null; threadLocalMS.remove(); } logInfo("Metastore shutdown complete."); }
?
Hive Metastore ObjectStore PersistenceManager自動(dòng)關(guān)閉bug解析
更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號(hào)聯(lián)系: 360901061
您的支持是博主寫作最大的動(dòng)力,如果您喜歡我的文章,感覺我的文章對(duì)您有幫助,請(qǐng)用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長(zhǎng)非常感激您!手機(jī)微信長(zhǎng)按不能支付解決辦法:請(qǐng)將微信支付二維碼保存到相冊(cè),切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對(duì)您有幫助就好】元
