博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊storm的OpaquePartitionedTridentSpoutExecutor
阅读量:5997 次
发布时间:2019-06-20

本文共 22462 字,大约阅读时间需要 74 分钟。

本文主要研究一下storm的OpaquePartitionedTridentSpoutExecutor

TridentTopology.newStream

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/TridentTopology.java

public Stream newStream(String txId, IOpaquePartitionedTridentSpout spout) {        return newStream(txId, new OpaquePartitionedTridentSpoutExecutor(spout));    }
  • TridentTopology.newStream方法,对于IOpaquePartitionedTridentSpout类型的spout会使用OpaquePartitionedTridentSpoutExecutor来包装;而KafkaTridentSpoutOpaque则实现了IOpaquePartitionedTridentSpout接口

TridentTopologyBuilder.buildTopology

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/TridentTopologyBuilder.java

public StormTopology buildTopology(Map
masterCoordResources) { TopologyBuilder builder = new TopologyBuilder(); Map
batchIdsForSpouts = fleshOutStreamBatchIds(false); Map
batchIdsForBolts = fleshOutStreamBatchIds(true); Map
> batchesToCommitIds = new HashMap<>(); Map
> batchesToSpouts = new HashMap<>(); for(String id: _spouts.keySet()) { TransactionalSpoutComponent c = _spouts.get(id); if(c.spout instanceof IRichSpout) { //TODO: wrap this to set the stream name builder.setSpout(id, (IRichSpout) c.spout, c.parallelism); } else { String batchGroup = c.batchGroupId; if(!batchesToCommitIds.containsKey(batchGroup)) { batchesToCommitIds.put(batchGroup, new ArrayList
()); } batchesToCommitIds.get(batchGroup).add(c.commitStateId); if(!batchesToSpouts.containsKey(batchGroup)) { batchesToSpouts.put(batchGroup, new ArrayList
()); } batchesToSpouts.get(batchGroup).add((ITridentSpout) c.spout); BoltDeclarer scd = builder.setBolt(spoutCoordinator(id), new TridentSpoutCoordinator(c.commitStateId, (ITridentSpout) c.spout)) .globalGrouping(masterCoordinator(c.batchGroupId), MasterBatchCoordinator.BATCH_STREAM_ID) .globalGrouping(masterCoordinator(c.batchGroupId), MasterBatchCoordinator.SUCCESS_STREAM_ID); for(Map
m: c.componentConfs) { scd.addConfigurations(m); } Map
specs = new HashMap(); specs.put(c.batchGroupId, new CoordSpec()); BoltDeclarer bd = builder.setBolt(id, new TridentBoltExecutor( new TridentSpoutExecutor( c.commitStateId, c.streamName, ((ITridentSpout) c.spout)), batchIdsForSpouts, specs), c.parallelism); bd.allGrouping(spoutCoordinator(id), MasterBatchCoordinator.BATCH_STREAM_ID); bd.allGrouping(masterCoordinator(batchGroup), MasterBatchCoordinator.SUCCESS_STREAM_ID); if(c.spout instanceof ICommitterTridentSpout) { bd.allGrouping(masterCoordinator(batchGroup), MasterBatchCoordinator.COMMIT_STREAM_ID); } for(Map
m: c.componentConfs) { bd.addConfigurations(m); } } } //...... return builder.createTopology(); }
  • TridentTopologyBuilder.buildTopology会将IOpaquePartitionedTridentSpout(OpaquePartitionedTridentSpoutExecutor)使用TridentSpoutExecutor包装,然后再使用TridentBoltExecutor包装为bolt

OpaquePartitionedTridentSpoutExecutor

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java

public class OpaquePartitionedTridentSpoutExecutor implements ICommitterTridentSpout {    protected final Logger LOG = LoggerFactory.getLogger(OpaquePartitionedTridentSpoutExecutor.class);    IOpaquePartitionedTridentSpout
_spout; //...... public OpaquePartitionedTridentSpoutExecutor(IOpaquePartitionedTridentSpout
spout) { _spout = spout; } @Override public ITridentSpout.BatchCoordinator
getCoordinator(String txStateId, Map conf, TopologyContext context) { return new Coordinator(conf, context); } @Override public ICommitterTridentSpout.Emitter getEmitter(String txStateId, Map conf, TopologyContext context) { return new Emitter(txStateId, conf, context); } @Override public Fields getOutputFields() { return _spout.getOutputFields(); } @Override public Map
getComponentConfiguration() { return _spout.getComponentConfiguration(); } }
  • OpaquePartitionedTridentSpoutExecutor实现了ICommitterTridentSpout,这里getCoordinator返回的是ITridentSpout.BatchCoordinator,getEmitter返回的是ICommitterTridentSpout.Emitter

ITridentSpout.BatchCoordinator

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java

public class Coordinator implements ITridentSpout.BatchCoordinator {        IOpaquePartitionedTridentSpout.Coordinator _coordinator;        public Coordinator(Map conf, TopologyContext context) {            _coordinator = _spout.getCoordinator(conf, context);        }                @Override        public Object initializeTransaction(long txid, Object prevMetadata, Object currMetadata) {            LOG.debug("Initialize Transaction. [txid = {}], [prevMetadata = {}], [currMetadata = {}]", txid, prevMetadata, currMetadata);            return _coordinator.getPartitionsForBatch();        }        @Override        public void close() {            LOG.debug("Closing");            _coordinator.close();            LOG.debug("Closed");        }        @Override        public void success(long txid) {            LOG.debug("Success [txid = {}]", txid);        }        @Override        public boolean isReady(long txid) {            boolean ready = _coordinator.isReady(txid);            LOG.debug("[isReady = {}], [txid = {}]", ready, txid);            return ready;        }    }
  • 包装了spout的_coordinator,它的类型IOpaquePartitionedTridentSpout.Coordinator,这里仅仅是多了debug日志

ICommitterTridentSpout.Emitter

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java

public class Emitter implements ICommitterTridentSpout.Emitter {                IOpaquePartitionedTridentSpout.Emitter
_emitter; TransactionalState _state; TreeMap
> _cachedMetas = new TreeMap<>(); Map
_partitionStates = new HashMap<>(); int _index; int _numTasks; public Emitter(String txStateId, Map conf, TopologyContext context) { _emitter = _spout.getEmitter(conf, context); _index = context.getThisTaskIndex(); _numTasks = context.getComponentTasks(context.getThisComponentId()).size(); _state = TransactionalState.newUserState(conf, txStateId); LOG.debug("Created {}", this); } Object _savedCoordinatorMeta = null; boolean _changedMeta = false; @Override public void emitBatch(TransactionAttempt tx, Object coordinatorMeta, TridentCollector collector) { LOG.debug("Emitting Batch. [transaction = {}], [coordinatorMeta = {}], [collector = {}], [{}]", tx, coordinatorMeta, collector, this); if(_savedCoordinatorMeta==null || !_savedCoordinatorMeta.equals(coordinatorMeta)) { _partitionStates.clear(); final List
taskPartitions = _emitter.getPartitionsForTask(_index, _numTasks, coordinatorMeta); for (ISpoutPartition partition : taskPartitions) { _partitionStates.put(partition.getId(), new EmitterPartitionState(new RotatingTransactionalState(_state, partition.getId()), partition)); } // refresh all partitions for backwards compatibility with old spout _emitter.refreshPartitions(_emitter.getOrderedPartitions(coordinatorMeta)); _savedCoordinatorMeta = coordinatorMeta; _changedMeta = true; } Map
metas = new HashMap<>(); _cachedMetas.put(tx.getTransactionId(), metas); Entry
> entry = _cachedMetas.lowerEntry(tx.getTransactionId()); Map
prevCached; if(entry!=null) { prevCached = entry.getValue(); } else { prevCached = new HashMap<>(); } for(Entry
e: _partitionStates.entrySet()) { String id = e.getKey(); EmitterPartitionState s = e.getValue(); s.rotatingState.removeState(tx.getTransactionId()); Object lastMeta = prevCached.get(id); if(lastMeta==null) lastMeta = s.rotatingState.getLastState(); Object meta = _emitter.emitPartitionBatch(tx, collector, s.partition, lastMeta); metas.put(id, meta); } LOG.debug("Emitted Batch. [transaction = {}], [coordinatorMeta = {}], [collector = {}], [{}]", tx, coordinatorMeta, collector, this); } @Override public void success(TransactionAttempt tx) { for(EmitterPartitionState state: _partitionStates.values()) { state.rotatingState.cleanupBefore(tx.getTransactionId()); } LOG.debug("Success transaction {}. [{}]", tx, this); } @Override public void commit(TransactionAttempt attempt) { LOG.debug("Committing transaction {}. [{}]", attempt, this); // this code here handles a case where a previous commit failed, and the partitions // changed since the last commit. This clears out any state for the removed partitions // for this txid. // we make sure only a single task ever does this. we're also guaranteed that // it's impossible for there to be another writer to the directory for that partition // because only a single commit can be happening at once. this is because in order for // another attempt of the batch to commit, the batch phase must have succeeded in between. // hence, all tasks for the prior commit must have finished committing (whether successfully or not) if(_changedMeta && _index==0) { Set
validIds = new HashSet<>(); for(ISpoutPartition p: _emitter.getOrderedPartitions(_savedCoordinatorMeta)) { validIds.add(p.getId()); } for(String existingPartition: _state.list("")) { if(!validIds.contains(existingPartition)) { RotatingTransactionalState s = new RotatingTransactionalState(_state, existingPartition); s.removeState(attempt.getTransactionId()); } } _changedMeta = false; } Long txid = attempt.getTransactionId(); Map
metas = _cachedMetas.remove(txid); for(Entry
entry: metas.entrySet()) { _partitionStates.get(entry.getKey()).rotatingState.overrideState(txid, entry.getValue()); } LOG.debug("Exiting commit method for transaction {}. [{}]", attempt, this); } @Override public void close() { LOG.debug("Closing"); _emitter.close(); LOG.debug("Closed"); } @Override public String toString() { return "Emitter{" + ", _state=" + _state + ", _cachedMetas=" + _cachedMetas + ", _partitionStates=" + _partitionStates + ", _index=" + _index + ", _numTasks=" + _numTasks + ", _savedCoordinatorMeta=" + _savedCoordinatorMeta + ", _changedMeta=" + _changedMeta + '}'; } } static class EmitterPartitionState { public RotatingTransactionalState rotatingState; public ISpoutPartition partition; public EmitterPartitionState(RotatingTransactionalState s, ISpoutPartition p) { rotatingState = s; partition = p; } }
  • 这里对spout的IOpaquePartitionedTridentSpout.Emitter进行了封装,_partitionStates使用了EmitterPartitionState
  • emitBatch方法首先计算_partitionStates,然后计算prevCached,最后调用_emitter.emitPartitionBatch(tx, collector, s.partition, lastMeta)
  • success方法调用state.rotatingState.cleanupBefore(tx.getTransactionId()),清空该txid之前的状态信息;commit方法主要是更新_partitionStates

KafkaTridentSpoutOpaque

storm-kafka-client-1.2.2-sources.jar!/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java

public class KafkaTridentSpoutOpaque
implements IOpaquePartitionedTridentSpout
>, KafkaTridentSpoutTopicPartition, Map
> { private static final long serialVersionUID = -8003272486566259640L; private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutOpaque.class); private final KafkaTridentSpoutManager
kafkaManager; public KafkaTridentSpoutOpaque(KafkaSpoutConfig
conf) { this(new KafkaTridentSpoutManager<>(conf)); } public KafkaTridentSpoutOpaque(KafkaTridentSpoutManager
kafkaManager) { this.kafkaManager = kafkaManager; LOG.debug("Created {}", this.toString()); } @Override public Emitter
>, KafkaTridentSpoutTopicPartition, Map
> getEmitter( Map conf, TopologyContext context) { return new KafkaTridentSpoutEmitter<>(kafkaManager, context); } @Override public Coordinator
>> getCoordinator(Map conf, TopologyContext context) { return new KafkaTridentSpoutOpaqueCoordinator<>(kafkaManager); } @Override public Map
getComponentConfiguration() { return null; } @Override public Fields getOutputFields() { final Fields outputFields = kafkaManager.getFields(); LOG.debug("OutputFields = {}", outputFields); return outputFields; } @Override public final String toString() { return super.toString() + "{kafkaManager=" + kafkaManager + '}'; }}
  • KafkaTridentSpoutOpaque的getCoordinator返回的是KafkaTridentSpoutOpaqueCoordinator;getEmitter返回的是KafkaTridentSpoutEmitter

KafkaTridentSpoutOpaqueCoordinator

storm-kafka-client-1.2.2-sources.jar!/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinator.java

public class KafkaTridentSpoutOpaqueCoordinator
implements IOpaquePartitionedTridentSpout.Coordinator
>>, Serializable { private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutOpaqueCoordinator.class); private final TopicPartitionSerializer tpSerializer = new TopicPartitionSerializer(); private final KafkaTridentSpoutManager
kafkaManager; public KafkaTridentSpoutOpaqueCoordinator(KafkaTridentSpoutManager
kafkaManager) { this.kafkaManager = kafkaManager; LOG.debug("Created {}", this.toString()); } @Override public boolean isReady(long txid) { LOG.debug("isReady = true"); return true; // the "old" trident kafka spout always returns true, like this } @Override public List
> getPartitionsForBatch() { final ArrayList
topicPartitions = new ArrayList<>(kafkaManager.getTopicPartitions()); LOG.debug("TopicPartitions for batch {}", topicPartitions); List
> tps = new ArrayList<>(); for(TopicPartition tp : topicPartitions) { tps.add(tpSerializer.toMap(tp)); } return tps; } @Override public void close() { LOG.debug("Closed"); // the "old" trident kafka spout is no op like this } @Override public final String toString() { return super.toString() + "{kafkaManager=" + kafkaManager + '}'; }}
  • 这里的isReady始终返回true,getPartitionsForBatch方法主要是将kafkaManager.getTopicPartitions()信息转换为map结构

KafkaTridentSpoutEmitter

storm-kafka-client-1.2.2-sources.jar!/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java

public class KafkaTridentSpoutEmitter
implements IOpaquePartitionedTridentSpout.Emitter< List
>, KafkaTridentSpoutTopicPartition, Map
>, Serializable { private static final long serialVersionUID = -7343927794834130435L; private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutEmitter.class); // Kafka private final KafkaConsumer
kafkaConsumer; // Bookkeeping private final KafkaTridentSpoutManager
kafkaManager; // set of topic-partitions for which first poll has already occurred, and the first polled txid private final Map
firstPollTransaction = new HashMap<>(); // Declare some KafkaTridentSpoutManager references for convenience private final long pollTimeoutMs; private final KafkaSpoutConfig.FirstPollOffsetStrategy firstPollOffsetStrategy; private final RecordTranslator
translator; private final Timer refreshSubscriptionTimer; private final TopicPartitionSerializer tpSerializer = new TopicPartitionSerializer(); private TopologyContext topologyContext; /** * Create a new Kafka spout emitter. * @param kafkaManager The Kafka consumer manager to use * @param topologyContext The topology context * @param refreshSubscriptionTimer The timer for deciding when to recheck the subscription */ public KafkaTridentSpoutEmitter(KafkaTridentSpoutManager
kafkaManager, TopologyContext topologyContext, Timer refreshSubscriptionTimer) { this.kafkaConsumer = kafkaManager.createAndSubscribeKafkaConsumer(topologyContext); this.kafkaManager = kafkaManager; this.topologyContext = topologyContext; this.refreshSubscriptionTimer = refreshSubscriptionTimer; this.translator = kafkaManager.getKafkaSpoutConfig().getTranslator(); final KafkaSpoutConfig
kafkaSpoutConfig = kafkaManager.getKafkaSpoutConfig(); this.pollTimeoutMs = kafkaSpoutConfig.getPollTimeoutMs(); this.firstPollOffsetStrategy = kafkaSpoutConfig.getFirstPollOffsetStrategy(); LOG.debug("Created {}", this.toString()); } /** * Creates instance of this class with default 500 millisecond refresh subscription timer */ public KafkaTridentSpoutEmitter(KafkaTridentSpoutManager
kafkaManager, TopologyContext topologyContext) { this(kafkaManager, topologyContext, new Timer(500, kafkaManager.getKafkaSpoutConfig().getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS)); } //...... @Override public Map
emitPartitionBatch(TransactionAttempt tx, TridentCollector collector, KafkaTridentSpoutTopicPartition currBatchPartition, Map
lastBatch) { LOG.debug("Processing batch: [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], [collector = {}]", tx, currBatchPartition, lastBatch, collector); final TopicPartition currBatchTp = currBatchPartition.getTopicPartition(); final Set
assignments = kafkaConsumer.assignment(); KafkaTridentSpoutBatchMetadata lastBatchMeta = lastBatch == null ? null : KafkaTridentSpoutBatchMetadata.fromMap(lastBatch); KafkaTridentSpoutBatchMetadata currentBatch = lastBatchMeta; Collection
pausedTopicPartitions = Collections.emptySet(); if (assignments == null || !assignments.contains(currBatchPartition.getTopicPartition())) { LOG.warn("SKIPPING processing batch [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], " + "[collector = {}] because it is not part of the assignments {} of consumer instance [{}] " + "of consumer group [{}]", tx, currBatchPartition, lastBatch, collector, assignments, kafkaConsumer, kafkaManager.getKafkaSpoutConfig().getConsumerGroupId()); } else { try { // pause other topic-partitions to only poll from current topic-partition pausedTopicPartitions = pauseTopicPartitions(currBatchTp); seek(currBatchTp, lastBatchMeta, tx.getTransactionId()); // poll if (refreshSubscriptionTimer.isExpiredResetOnTrue()) { kafkaManager.getKafkaSpoutConfig().getSubscription().refreshAssignment(); } final ConsumerRecords
records = kafkaConsumer.poll(pollTimeoutMs); LOG.debug("Polled [{}] records from Kafka.", records.count()); if (!records.isEmpty()) { emitTuples(collector, records); // build new metadata currentBatch = new KafkaTridentSpoutBatchMetadata(currBatchTp, records); } } finally { kafkaConsumer.resume(pausedTopicPartitions); LOG.trace("Resumed topic-partitions {}", pausedTopicPartitions); } LOG.debug("Emitted batch: [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], " + "[currBatchMetadata = {}], [collector = {}]", tx, currBatchPartition, lastBatch, currentBatch, collector); } return currentBatch == null ? null : currentBatch.toMap(); } private void emitTuples(TridentCollector collector, ConsumerRecords
records) { for (ConsumerRecord
record : records) { final List
tuple = translator.apply(record); collector.emit(tuple); LOG.debug("Emitted tuple {} for record [{}]", tuple, record); } } @Override public void refreshPartitions(List
partitionResponsibilities) { LOG.trace("Refreshing of topic-partitions handled by Kafka. " + "No action taken by this method for topic partitions {}", partitionResponsibilities); } /** * Computes ordered list of topic-partitions for this task taking into consideration that topic-partitions * for this task must be assigned to the Kafka consumer running on this task. * * @param allPartitionInfo list of all partitions as returned by {@link KafkaTridentSpoutOpaqueCoordinator} * @return ordered list of topic partitions for this task */ @Override public List
getOrderedPartitions(final List
> allPartitionInfo) { List
allTopicPartitions = new ArrayList<>(); for(Map
map : allPartitionInfo) { allTopicPartitions.add(tpSerializer.fromMap(map)); } final List
allPartitions = newKafkaTridentSpoutTopicPartitions(allTopicPartitions); LOG.debug("Returning all topic-partitions {} across all tasks. Current task index [{}]. Total tasks [{}] ", allPartitions, topologyContext.getThisTaskIndex(), getNumTasks()); return allPartitions; } @Override public List
getPartitionsForTask(int taskId, int numTasks, List
> allPartitionInfo) { final Set
assignedTps = kafkaConsumer.assignment(); LOG.debug("Consumer [{}], running on task with index [{}], has assigned topic-partitions {}", kafkaConsumer, taskId, assignedTps); final List
taskTps = newKafkaTridentSpoutTopicPartitions(assignedTps); LOG.debug("Returning topic-partitions {} for task with index [{}]", taskTps, taskId); return taskTps; } @Override public void close() { kafkaConsumer.close(); LOG.debug("Closed"); } @Override public final String toString() { return super.toString() + "{kafkaManager=" + kafkaManager + '}'; }}
  • 这里的refreshSubscriptionTimer的interval取的是kafkaManager.getKafkaSpoutConfig().getPartitionRefreshPeriodMs(),默认是2000
  • emitPartitionBatch方法没调用一次都会判断refreshSubscriptionTimer.isExpiredResetOnTrue(),如果时间到了,就会调用kafkaManager.getKafkaSpoutConfig().getSubscription().refreshAssignment()刷新assignment
  • emitPartitionBatch方法主要是找到与该batch关联的partition,停止从其他parition拉取消息,然后根据firstPollOffsetStrategy以及lastBatchMeta信息,调用kafkaConsumer的seek相关方法seek到指定位置
  • 之后就是用kafkaConsumer.poll(pollTimeoutMs)拉取数据,然后emitTuples;emitTuples方法会是用translator转换数据,然后调用collector.emit发射出去
  • refreshPartitions方法目前仅仅是trace下日志;getOrderedPartitions方法先将allPartitionInfo的数据从map结构反序列化回来,然后转换为KafkaTridentSpoutTopicPartition返回;getPartitionsForTask方法主要是通过kafkaConsumer.assignment()的信息转换为KafkaTridentSpoutTopicPartition返回

小结

  • storm-kafka-client提供了KafkaTridentSpoutOpaque这个spout作为trident的kafka spout(旧版的为OpaqueTridentKafkaSpout,在storm-kafka类库中),它实现了IOpaquePartitionedTridentSpout接口
  • TridentTopology.newStream方法,对于IOpaquePartitionedTridentSpout类型的spout会使用OpaquePartitionedTridentSpoutExecutor来包装;TridentTopologyBuilder.buildTopology会将IOpaquePartitionedTridentSpout(OpaquePartitionedTridentSpoutExecutor)先使用TridentSpoutExecutor包装,然后再使用TridentBoltExecutor包装为bolt
  • OpaquePartitionedTridentSpoutExecutor的getCoordinator返回的是ITridentSpout.BatchCoordinator,getEmitter返回的是ICommitterTridentSpout.Emitter;他们分别对KafkaTridentSpoutOpaque这个原始spout返回的KafkaTridentSpoutOpaqueCoordinator以及KafkaTridentSpoutEmitter进行包装再处理;其中对coordinator加了debug日志,对emitter则主要多了对EmitterPartitionState的存取

doc

转载地址:http://ywhlx.baihongyu.com/

你可能感兴趣的文章
Linux企业应用微博客正式开通
查看>>
64位linux下的gns3网络模拟器配置
查看>>
效果差学费贵售后难,VIPKID米雯娟的野心不能只靠“烧钱”营销
查看>>
Windows Server 2012 R2 WSUS-10:流程概述
查看>>
自动发现服务是怎样工作的?
查看>>
Office 365 系列之七:安装Office 365 ProPlus
查看>>
闲诗一首:《莫追梦》
查看>>
Cisco/H3C交换机配置与管理完全手册(第2版)卓越网正式到货
查看>>
让VMware ESX中的虚拟机随esx开机自动启动
查看>>
rhel6.5解决包的依赖的一个处理方法
查看>>
小功能隐藏着大学问---windows的ACL带来的挑战
查看>>
RSA2012系列(4):网络战揭秘
查看>>
Puppet扩展篇6-通过横向扩展puppetmaster增加架构的灵活性
查看>>
西安OpenParty11月29日活动高清图文回顾——新增西安APEC蓝美图!
查看>>
SFB 项目经验-16-呼叫前客户端性能测试
查看>>
我是如何帮助创业公司改进企业工作的
查看>>
taglist
查看>>
UITabBarController 的使用
查看>>
卡特兰数
查看>>
epoll实现机制分析
查看>>